From be86c23f0586960096a2410d74102dc2cddb411d Mon Sep 17 00:00:00 2001 From: huzheng Date: Fri, 29 Dec 2017 15:55:28 +0800 Subject: [PATCH] HBASE-19661 Replace ReplicationStateZKBase with ZKReplicationStorageBase --- .../hbase/replication/ReplicationFactory.java | 6 +- .../hbase/replication/ReplicationStateZKBase.java | 153 --------------------- .../replication/ReplicationTrackerZKImpl.java | 12 +- .../replication/ZKReplicationPeerStorage.java | 24 +--- .../replication/ZKReplicationStorageBase.java | 66 ++++++++- .../org/apache/hadoop/hbase/master/HMaster.java | 4 +- .../master/cleaner/ReplicationZKNodeCleaner.java | 10 +- .../master/ReplicationPeerConfigUpgrader.java | 87 ++++++------ .../regionserver/DumpReplicationQueues.java | 17 +-- .../replication/regionserver/Replication.java | 3 +- .../replication/TestReplicationTrackerZKImpl.java | 2 +- .../replication/master/TestTableCFsUpdater.java | 9 +- .../regionserver/TestReplicationSourceManager.java | 6 +- 13 files changed, 130 insertions(+), 269 deletions(-) delete mode 100644 hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 6c66aff..193cf5f 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -33,9 +32,8 @@ public class ReplicationFactory { return new ReplicationPeers(zk, conf); } - public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, - final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable, + public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, Configuration conf, Stoppable stopper) { - return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper); + return new ReplicationTrackerZKImpl(zookeeper, conf, stopper); } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java deleted file mode 100644 index edbbd4b..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; - -/** - * This is a base class for maintaining replication state in zookeeper. - */ -@InterfaceAudience.Private -public abstract class ReplicationStateZKBase { - - /** - * The name of the znode that contains the replication status of a remote slave (i.e. peer) - * cluster. - */ - protected final String peerStateNodeName; - /** The name of the base znode that contains all replication state. */ - protected final String replicationZNode; - /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */ - protected final String peersZNode; - /** The name of the znode that contains all replication queues */ - protected final String queuesZNode; - /** The name of the znode that contains queues of hfile references to be replicated */ - protected final String hfileRefsZNode; - /** The cluster key of the local cluster */ - protected final String ourClusterKey; - /** The name of the znode that contains tableCFs */ - protected final String tableCFsNodeName; - - protected final ZKWatcher zookeeper; - protected final Configuration conf; - protected final Abortable abortable; - - public static final byte[] ENABLED_ZNODE_BYTES = - toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); - public static final byte[] DISABLED_ZNODE_BYTES = - toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); - public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = - "zookeeper.znode.replication.hfile.refs"; - public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; - - public ReplicationStateZKBase(ZKWatcher zookeeper, Configuration conf, - Abortable abortable) { - this.zookeeper = zookeeper; - this.conf = conf; - this.abortable = abortable; - - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); - String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); - String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, - ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); - this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); - this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); - this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); - this.replicationZNode = ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, - replicationZNodeName); - this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); - this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); - this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); - } - - public List getListOfReplicators() { - List result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of replicators", e); - } - return result; - } - - /** - * @param state - * @return Serialized protobuf of state with pb magic prefix prepended suitable for - * use as content of a peer-state znode under a peer cluster id as in - * /hbase/replication/peers/PEER_ID/peer-state. - */ - protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) { - ReplicationProtos.ReplicationState msg = - ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); - // There is no toByteArray on this pb Message? - // 32 bytes is default which seems fair enough here. - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16); - msg.writeTo(cos); - cos.flush(); - baos.flush(); - return ProtobufUtil.prependPBMagic(baos.toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - protected boolean peerExists(String id) throws KeeperException { - return ZKUtil.checkExists(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id)) >= 0; - } - - /** - * Determine if a ZK path points to a peer node. - * @param path path to be checked - * @return true if the path points to a peer node, otherwise false - */ - protected boolean isPeerPath(String path) { - return path.split("/").length == peersZNode.split("/").length + 1; - } - - @VisibleForTesting - protected String getTableCFsNode(String id) { - return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.tableCFsNodeName)); - } - - @VisibleForTesting - protected String getPeerStateNode(String id) { - return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.peerStateNodeName)); - } - @VisibleForTesting - protected String getPeerNode(String id) { - return ZNodePaths.joinZNode(this.peersZNode, id); - } -} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java index 5659e4b..875b2c7 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.zookeeper.KeeperException; @@ -38,7 +37,8 @@ import org.slf4j.LoggerFactory; * interface. */ @InterfaceAudience.Private -public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker { +public class ReplicationTrackerZKImpl extends ZKReplicationStorageBase + implements ReplicationTracker { private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerZKImpl.class); // All about stopping @@ -48,9 +48,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements // List of all the other region servers in this cluster private final ArrayList otherRegionServers = new ArrayList<>(); - public ReplicationTrackerZKImpl(ZKWatcher zookeeper, final ReplicationPeers replicationPeers, - Configuration conf, Abortable abortable, Stoppable stopper) { - super(zookeeper, conf, abortable); + public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Configuration conf, Stoppable stopper) { + super(zookeeper, conf); this.stopper = stopper; this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); // watch the changes @@ -184,7 +183,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.zookeeper.znodePaths.rsZNode); } } catch (KeeperException e) { - this.abortable.abort("Get list of registered region servers", e); + LOG.error("Catch exception: ", e); + stopper.stop("Failed to get list of register region servers"); } return result; } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index bf448e8..684ec6f 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -30,8 +30,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; @@ -39,41 +37,21 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; * ZK based replication peer storage. */ @InterfaceAudience.Private -class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage { - - private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationPeerStorage.class); +public class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage { public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); public static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); - /** - * The name of the znode that contains the replication status of a remote slave (i.e. peer) - * cluster. - */ - private final String peerStateNodeName; - - /** - * The name of the znode that contains a list of all remote slave (i.e. peer) clusters. - */ - private final String peersZNode; - public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) { super(zookeeper, conf); - this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); - String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); - this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); } private String getPeerStateNode(String peerId) { return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName); } - private String getPeerNode(String peerId) { - return ZNodePaths.joinZNode(peersZNode, peerId); - } - @Override public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) throws ReplicationException { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java index d09a56b..09ca175 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java @@ -21,6 +21,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; @@ -34,21 +36,63 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; * zookeeper. */ @InterfaceAudience.Private -class ZKReplicationStorageBase { +public class ZKReplicationStorageBase { - /** The name of the base znode that contains all replication state. */ - protected final String replicationZNode; + private static final String REPLICATION_ZNODE_NAME = "zookeeper.znode.replication"; + public static final String REPLICATION_ZNODE_DEFAULT = "replication"; + + public static final String PEER_ZNODE_NAME = REPLICATION_ZNODE_NAME + ".peers"; + public static final String PEER_ZNODE_DEFAULT = "peers"; + + public static final String QUEUES_ZNODE_NAME = REPLICATION_ZNODE_NAME + ".rs"; + public static final String QUEUES_ZNODE_DEFAULT = "rs"; + + public static final String HFILE_REFS_ZNODE = REPLICATION_ZNODE_NAME + ".hfile.refs"; + public static final String HFILE_REFS_ZNODE_DEFAULT = "hfile-refs"; + public static final String PEERS_STATE_ZNODE = REPLICATION_ZNODE_NAME + ".peers.state"; + public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state"; + + public static final String TABLE_CFS_ZNODE = REPLICATION_ZNODE_NAME + ".peers.tableCFs"; + public static final String TABLE_CFS_ZNODE_DEFAULT = "tableCFs"; + + /** The name of the base znode that contains all replication state. */ protected final ZKWatcher zookeeper; protected final Configuration conf; + /** The name of the znode that contains peer state **/ + protected final String peerStateNodeName; + /** The name of the base znode that contains all replication state. */ + protected final String replicationZNode; + /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */ + protected final String peersZNode; + /** The name of the znode that contains all replication queues */ + protected final String queuesZNode; + /** The name of the znode that contains queues of hfile references to be replicated */ + protected final String hfileRefsZNode; + /** The cluster key of the local cluster */ + protected final String ourClusterKey; + /** The name of the znode that contains tableCFs */ + protected final String tableCFsNodeName; + protected ZKReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) { this.zookeeper = zookeeper; this.conf = conf; - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + + String replicationZNodeName = conf.get(REPLICATION_ZNODE_NAME, REPLICATION_ZNODE_DEFAULT); + String peersZNodeName = conf.get(PEER_ZNODE_NAME, PEER_ZNODE_DEFAULT); + String queuesZNodeName = conf.get(QUEUES_ZNODE_NAME, QUEUES_ZNODE_DEFAULT); + String hFileRefsZNodeName = conf.get(HFILE_REFS_ZNODE, HFILE_REFS_ZNODE_DEFAULT); + + this.peerStateNodeName = conf.get(PEERS_STATE_ZNODE, PEERS_STATE_ZNODE_DEFAULT); + this.tableCFsNodeName = conf.get(TABLE_CFS_ZNODE, TABLE_CFS_ZNODE_DEFAULT); + this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); this.replicationZNode = - ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName); + ZNodePaths.joinZNode(zookeeper.znodePaths.baseZNode, replicationZNodeName); + this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); + this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); + this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hFileRefsZNodeName); } /** @@ -58,7 +102,7 @@ class ZKReplicationStorageBase { */ protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) { ReplicationProtos.ReplicationState msg = - ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); + ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); // There is no toByteArray on this pb Message? // 32 bytes is default which seems fair enough here. try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { @@ -71,4 +115,14 @@ class ZKReplicationStorageBase { throw new RuntimeException(e); } } + + @VisibleForTesting + public String getTableCFsNode(String id) { + return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.tableCFsNodeName)); + } + + @VisibleForTesting + public String getPeerNode(String id) { + return ZNodePaths.joinZNode(this.peersZNode, id); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 468ff89..68006a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -806,8 +806,8 @@ public class HMaster extends HRegionServer implements MasterServices { // This is for backwards compatibility // See HBASE-11393 status.setStatus("Update TableCFs node in ZNode"); - ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zooKeeper, - conf, this.clusterConnection); + ReplicationPeerConfigUpgrader tableCFsUpdater = + new ReplicationPeerConfigUpgrader(zooKeeper, conf); tableCFsUpdater.copyTableCFs(); // Add the Observer to delete space quotas on table deletion before starting all CPs by diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java index f2c3ec9..b295e14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java @@ -33,8 +33,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; @@ -57,7 +57,7 @@ public class ReplicationZKNodeCleaner { throws IOException { this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf); - this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable); + this.queueDeletor = new ReplicationQueueDeletor(zkw, conf); } /** @@ -114,10 +114,10 @@ public class ReplicationZKNodeCleaner { return undeletedHFileRefsQueue; } - private class ReplicationQueueDeletor extends ReplicationStateZKBase { + private class ReplicationQueueDeletor extends ZKReplicationStorageBase { - ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) { - super(zk, conf, abortable); + ReplicationQueueDeletor(ZKWatcher zk, Configuration conf) { + super(zk, conf); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java index ea5509f..81628f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java @@ -19,18 +19,19 @@ package org.apache.hadoop.hbase.replication.master; import java.io.IOException; -import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -41,52 +42,48 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; /** - * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. - * It will be removed in HBase 3.x. See HBASE-11393 + * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. It will + * be removed in HBase 3.x. See HBASE-11393 */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase { +public class ReplicationPeerConfigUpgrader extends ZKReplicationStorageBase { private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class); + private final Configuration conf; + private final ZKWatcher zookeeper; + private final ReplicationPeerStorage peerStorage; - public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, - Configuration conf, Abortable abortable) { - super(zookeeper, conf, abortable); + public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, Configuration conf) { + super(zookeeper, conf); + this.zookeeper = zookeeper; + this.conf = conf; + this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); } public void upgrade() throws Exception { try (Connection conn = ConnectionFactory.createConnection(conf)) { Admin admin = conn.getAdmin(); - admin.listReplicationPeers().forEach( - (peerDesc) -> { - String peerId = peerDesc.getPeerId(); - ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig(); - if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) - || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { - peerConfig.setReplicateAllUserTables(false); - try { - admin.updateReplicationPeerConfig(peerId, peerConfig); - } catch (Exception e) { - LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e); - } + admin.listReplicationPeers().forEach((peerDesc) -> { + String peerId = peerDesc.getPeerId(); + ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig(); + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) + || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + peerConfig.setReplicateAllUserTables(false); + try { + admin.updateReplicationPeerConfig(peerId, peerConfig); + } catch (Exception e) { + LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e); } - }); + } + }); } } - public void copyTableCFs() { - List znodes = null; - try { - znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); - } catch (KeeperException e) { - LOG.error("Failed to get peers znode", e); - } - if (znodes != null) { - for (String peerId : znodes) { - if (!copyTableCFs(peerId)) { - LOG.error("upgrade tableCFs failed for peerId=" + peerId); - } + public void copyTableCFs() throws ReplicationException { + for (String peerId : peerStorage.listPeerIds()) { + if (!copyTableCFs(peerId)) { + LOG.error("upgrade tableCFs failed for peerId=" + peerId); } } } @@ -96,18 +93,16 @@ public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase { try { if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) { String peerNode = getPeerNode(peerId); - ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode); + ReplicationPeerConfig rpc = getReplicationPeerConfig(peerNode); // We only need to copy data from tableCFs node to rpc Node the first time hmaster start. if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) { // we copy TableCFs node into PeerNode LOG.info("copy tableCFs into peerNode:" + peerId); ReplicationProtos.TableCF[] tableCFs = - ReplicationPeerConfigUtil.parseTableCFs( - ZKUtil.getData(this.zookeeper, tableCFsNode)); + ReplicationPeerConfigUtil.parseTableCFs(ZKUtil.getData(this.zookeeper, tableCFsNode)); if (tableCFs != null && tableCFs.length > 0) { rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs)); - ZKUtil.setData(this.zookeeper, peerNode, - ReplicationPeerConfigUtil.toByteArray(rpc)); + ZKUtil.setData(this.zookeeper, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); } } else { LOG.info("No tableCFs in peerNode:" + peerId); @@ -126,13 +121,13 @@ public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase { return true; } - private ReplicationPeerConfig getReplicationPeerConig(String peerNode) - throws KeeperException, InterruptedException { + private ReplicationPeerConfig getReplicationPeerConfig(String peerNode) + throws KeeperException, InterruptedException { byte[] data = null; data = ZKUtil.getData(this.zookeeper, peerNode); if (data == null) { - LOG.error("Could not get configuration for " + - "peer because it doesn't exist. peer=" + peerNode); + LOG.error( + "Could not get configuration for " + "peer because it doesn't exist. peer=" + peerNode); return null; } try { @@ -165,8 +160,8 @@ public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase { Configuration conf = HBaseConfiguration.create(); ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null); try { - ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zkw, - conf, null); + ReplicationPeerConfigUpgrader tableCFsUpdater = + new ReplicationPeerConfigUpgrader(zkw, conf); tableCFsUpdater.copyTableCFs(); } finally { zkw.close(); @@ -174,7 +169,7 @@ public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase { } else if (args[0].equals("upgrade")) { Configuration conf = HBaseConfiguration.create(); ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null); - ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf, null); + ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf); upgrader.upgrade(); } else { printUsageAndExit(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 632f6c7..b345b86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -238,7 +238,7 @@ public class DumpReplicationQueues extends Configured implements Tool { LOG.info("Found [--distributed], will poll each RegionServer."); Set peerIds = peers.stream().map((peer) -> peer.getPeerId()) .collect(Collectors.toSet()); - System.out.println(dumpQueues(connection, zkw, peerIds, opts.isHdfs())); + System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs())); System.out.println(dumpReplicationSummary()); } else { // use ZK instead @@ -302,18 +302,15 @@ public class DumpReplicationQueues extends Configured implements Tool { return sb.toString(); } - public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, Set peerIds, + public String dumpQueues(ZKWatcher zkw, Set peerIds, boolean hdfs) throws Exception { ReplicationQueueStorage queueStorage; - ReplicationPeers replicationPeers; ReplicationTracker replicationTracker; StringBuilder sb = new StringBuilder(); queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); - replicationPeers = - ReplicationFactory.getReplicationPeers(zkw, getConf()); - replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(), - new WarnOnlyAbortable(), new WarnOnlyStoppable()); + replicationTracker = + ReplicationFactory.getReplicationTracker(zkw, getConf(), new WarnOnlyStoppable()); Set liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers()); // Loops each peer on each RS and dumps the queues @@ -331,11 +328,9 @@ public class DumpReplicationQueues extends Configured implements Tool { List wals = queueStorage.getWALsInQueue(regionserver, queueId); if (!peerIds.contains(queueInfo.getPeerId())) { deletedQueues.add(regionserver + "/" + queueId); - sb.append( - formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); + sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); } else { - sb.append( - formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); + sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 7dae631..1b237fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -133,8 +133,7 @@ public class Replication implements ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf); this.replicationPeers.init(); this.replicationTracker = - ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, - this.conf, this.server, this.server); + ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.conf, this.server); } catch (Exception e) { throw new IOException("Failed replication handler create", e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java index fdfa6b7..b85105d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java @@ -90,7 +90,7 @@ public class TestReplicationTrackerZKImpl { ZKClusterId.setClusterId(zkw, new ClusterId()); rp = ReplicationFactory.getReplicationPeers(zkw, conf); rp.init(); - rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1)); + rt = ReplicationFactory.getReplicationTracker(zkw, conf, new DummyServer(fakeRs1)); } catch (Exception e) { fail("Exception during test setup: " + e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java index 2993043..b76a795 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java @@ -25,14 +25,12 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; -import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -62,7 +60,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { public TestName name = new TestName(); public TestTableCFsUpdater() { - super(zkw, TEST_UTIL.getConfiguration(), abortable); + super(zkw, TEST_UTIL.getConfiguration()); } @BeforeClass @@ -89,8 +87,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { } @Test - public void testUpgrade() throws KeeperException, InterruptedException, - DeserializationException { + public void testUpgrade() throws Exception { String peerId = "1"; final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); @@ -211,6 +208,4 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { tableNameListMap = actualRpc.getTableCFsMap(); assertNull(tableNameListMap); } - - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 768a24e..b744eed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -69,8 +69,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; -import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -163,9 +163,9 @@ public abstract class TestReplicationSourceManager { + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", - ReplicationStateZKBase.ENABLED_ZNODE_BYTES); + ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); - ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES); + ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); ZKClusterId.setClusterId(zkw, new ClusterId()); FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); -- 2.3.2 (Apple Git-55)