From ee7b557a6ca4ec4ad470c4391ddf8f95d229cb77 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Fri, 28 Oct 2016 13:20:50 +0800 Subject: [PATCH] HBASE-16947 Some improvements for DumpReplicationQueues tool --- .../replication/ReplicationQueuesClientZKImpl.java | 4 +- .../hbase/replication/ReplicationQueuesZKImpl.java | 8 +- .../regionserver/DumpReplicationQueues.java | 123 ++++++++++++++++----- 3 files changed, 102 insertions(+), 33 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index cc407e3..9078e40 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -40,7 +40,9 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem @Override public void init() throws ReplicationException { try { - ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); + if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) { + ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); + } } catch (KeeperException e) { throw new ReplicationException("Internal error while initializing a queues client", e); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index c7af78e..9beaba7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -82,14 +82,18 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R public void init(String serverName) throws ReplicationException { this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName); try { - ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode); + if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) { + ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode); + } } catch (KeeperException e) { throw new ReplicationException("Could not initialize replication queues.", e); } if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) { try { - ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); + if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) { + ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode); + } } catch (KeeperException e) { throw new ReplicationException("Could not initialize hfile references replication queue.", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index bf38d6f..0772f89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -40,6 +40,9 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.zookeeper.KeeperException; import org.mortbay.util.IO; +import com.google.common.util.concurrent.AtomicLongMap; + +import java.io.FileNotFoundException; import java.io.IOException; import java.util.*; @@ -55,6 +58,20 @@ public class DumpReplicationQueues extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(DumpReplicationQueues.class.getName()); + private List deadRegionServers; + private List deletedQueues; + private AtomicLongMap peersQueueSize; + private long totalSizeOfWALs; + private long numWalsNotFound; + + public DumpReplicationQueues() { + deadRegionServers = new ArrayList(); + deletedQueues = new ArrayList(); + peersQueueSize = AtomicLongMap.create(); + totalSizeOfWALs = 0; + numWalsNotFound = 0; + } + static class DumpOptions { boolean hdfs = false; boolean distributed = false; @@ -155,13 +172,14 @@ public class DumpReplicationQueues extends Configured implements Tool { if (message != null && message.length() > 0) { System.err.println(message); } - System.err.println("Usage: java " + className + " \\"); + System.err.println("Usage: bin/hbase " + className + " \\"); System.err.println(" [-D]*"); System.err.println(); System.err.println("General Options:"); - System.err.println(" distributed Poll each RS and print its own replication queue. " + System.err.println(" -h|--h|--help Show this help and exit."); + System.err.println(" --distributed Poll each RS and print its own replication queue. " + "Default only polls ZooKeeper"); - System.err.println(" hdfs Use HDFS to calculate usage of WALs by replication. It could be overestimated" + System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication. It could be overestimated" + " if replicating to multiple peers. --distributed flag is also needed."); } @@ -176,7 +194,6 @@ public class DumpReplicationQueues extends Configured implements Tool { HBaseAdmin.checkHBaseAvailable(conf); ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf); ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf); - long deleted = 0; ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(), new WarnOnlyAbortable(), true); @@ -203,11 +220,8 @@ public class DumpReplicationQueues extends Configured implements Tool { if (opts.isDistributed()) { LOG.info("Found [--distributed], will poll each RegionServer."); - System.out.println(dumpQueues(connection, zkw, opts.isHdfs(), deleted)); - if (deleted > 0) { - LOG.warn("Found " + deleted +" deleted queues" - + ", run hbck -fixReplication in order to remove the deleted replication queues"); - } + System.out.println(dumpQueues(connection, peerConfigs.keySet(), zkw, opts.isHdfs())); + System.out.println(dumpReplicationSummary()); } else { // use ZK instead System.out.print("Dumping replication znodes via ZooKeeper:"); @@ -221,21 +235,52 @@ public class DumpReplicationQueues extends Configured implements Tool { } } + public String dumpReplicationSummary() { + StringBuilder sb = new StringBuilder(); + if (!deletedQueues.isEmpty()) { + sb.append("Found " + deletedQueues.size() + " deleted queues" + + ", run hbck -fixReplication in order to remove the deleted replication queues\n"); + for (String deletedQueue : deletedQueues) { + sb.append(" " + deletedQueue + "\n"); + } + } + if (!deadRegionServers.isEmpty()) { + sb.append("Found " + deadRegionServers.size() + " dead regionservers" + + ", restart one regionserver to transfer the queues of dead regionservers\n"); + for (String deadRs : deadRegionServers) { + sb.append(" " + deadRs + "\n"); + } + } + if (!peersQueueSize.isEmpty()) { + sb.append("Dumping all peers's number of WALs in replication queue\n"); + for (Map.Entry entry : peersQueueSize.asMap().entrySet()) { + sb.append(" PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n"); + } + } + sb.append(" Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n"); + if (numWalsNotFound > 0) { + sb.append(" ERROR: There are " + numWalsNotFound + " WALs not found!!!\n"); + } + return sb.toString(); + } + public String dumpPeersState(ReplicationAdmin replicationAdmin, - Map peerConfigs) throws Exception { + Map peerConfigs) throws Exception { Map currentConf; StringBuilder sb = new StringBuilder(); for (Map.Entry peer : peerConfigs.entrySet()) { try { + ReplicationPeerConfig peerConfig = peer.getValue(); sb.append("Peer: " + peer.getKey() + "\n"); - sb.append(" " + "State: " + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n"); - sb.append(" " + "Cluster Name: " + peer.getValue() + "\n"); - currentConf = peer.getValue().getConfiguration(); + sb.append(" " + "State: " + + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n"); + sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n"); + currentConf = peerConfig.getConfiguration(); // Only show when we have a custom configuration for the peer if (currentConf.size() > 1) { sb.append(" " + "Peer Configuration: " + currentConf + "\n"); } - sb.append(" " + "Peer Table CFs: " + replicationAdmin.getPeerTableCFs(peer.getKey()) + "\n"); + sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n"); } catch (ReplicationException re) { sb.append("Got an exception while invoking ReplicationAdmin: " + re + "\n"); } @@ -243,33 +288,36 @@ public class DumpReplicationQueues extends Configured implements Tool { return sb.toString(); } - public String dumpQueues(ClusterConnection connection, ZooKeeperWatcher zkw, boolean hdfs, long deleted) - throws Exception { + public String dumpQueues(ClusterConnection connection, Set peerIds, ZooKeeperWatcher zkw, + boolean hdfs) throws Exception { ReplicationQueuesClient queuesClient; ReplicationPeers replicationPeers; ReplicationQueues replicationQueues; - + ReplicationTracker replicationTracker; StringBuilder sb = new StringBuilder(); queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, getConf(), connection); queuesClient.init(); replicationQueues = ReplicationFactory.getReplicationQueues(zkw, getConf(), connection); replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queuesClient, connection); - replicationPeers.init(); + replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(), + new WarnOnlyAbortable(), new WarnOnlyStoppable()); + List liveRegionServers = replicationTracker.getListOfRegionServers(); // Loops each peer on each RS and dumps the queues - - Set peerIds = new HashSet(replicationPeers.getAllPeerIds()); try { List regionservers = queuesClient.getListOfReplicators(); for (String regionserver : regionservers) { List queueIds = queuesClient.getAllQueues(regionserver); replicationQueues.init(regionserver); + if (!liveRegionServers.contains(regionserver)) { + deadRegionServers.add(regionserver); + } for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); List wals = queuesClient.getLogsInQueue(regionserver, queueId); if (!peerIds.contains(queueInfo.getPeerId())) { - deleted++; + deletedQueues.add(regionserver + "/" + queueId); sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs)); } else { sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs)); @@ -282,14 +330,14 @@ public class DumpReplicationQueues extends Configured implements Tool { return sb.toString(); } - private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo, - String queueId, List wals, boolean isDeleted, boolean hdfs) throws Exception { - + private String formatQueue(String regionserver, ReplicationQueues replicationQueues, + ReplicationQueueInfo queueInfo, String queueId, List wals, boolean isDeleted, + boolean hdfs) throws Exception { StringBuilder sb = new StringBuilder(); - List deadServers ; sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n"); + sb.append(" Queue znode: " + queueId + "\n"); sb.append(" PeerID: " + queueInfo.getPeerId() + "\n"); sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n"); deadServers = queueInfo.getDeadRegionServers(); @@ -300,6 +348,8 @@ public class DumpReplicationQueues extends Configured implements Tool { } sb.append(" Was deleted: " + isDeleted + "\n"); sb.append(" Number of WALs in replication queue: " + wals.size() + "\n"); + peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size()); + for (String wal : wals) { long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal); sb.append(" Replication position for " + wal + ": " + (position > 0 ? position : "0" @@ -324,11 +374,18 @@ public class DumpReplicationQueues extends Configured implements Tool { try { fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs); } catch (IOException e) { - LOG.warn("WAL " + wal + " couldn't be found, skipping"); - break; + if (e instanceof FileNotFoundException) { + numWalsNotFound++; + LOG.warn("WAL " + wal + " couldn't be found, skipping", e); + } else { + LOG.warn("Can't get file status of WAL " + wal + ", skipping", e); + } + continue; } size += fileStatus.getLen(); } + + totalSizeOfWALs += size; return size; } @@ -347,9 +404,15 @@ public class DumpReplicationQueues extends Configured implements Tool { } } - private static void usage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - LOG.error(errorMsg); + private static class WarnOnlyStoppable implements Stoppable { + @Override + public void stop(String why) { + LOG.warn("DumpReplicationQueue received stop, ignoring. Reason: " + why); + } + + @Override + public boolean isStopped() { + return false; } } } -- 1.9.1