From 3720fbb5e4fea8fe889158475a2024c5f65e27b0 Mon Sep 17 00:00:00 2001 From: Esteban Gutierrez Date: Thu, 18 Aug 2016 14:12:42 -0700 Subject: [PATCH] HBASE-16450 Shell tool to dump replication queues --- .../hbase/replication/ReplicationQueuesZKImpl.java | 2 +- .../org/apache/hadoop/hbase/zookeeper/ZKUtil.java | 11 + .../regionserver/DumpReplicationQueues.java | 232 +++++++++++++++++++++ 3 files changed, 244 insertions(+), 1 deletion(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index c1e85cb..1c579ab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -165,7 +165,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R return ZKUtil.parseWALPositionFrom(bytes); } catch (DeserializationException de) { LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename - + "znode content, continuing."); + + " znode content, continuing."); } // if we can not parse the position, start at the beginning of the wal file // again diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index f5b720e..f6914ae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -1823,6 +1823,17 @@ public class ZKUtil { } } + /** + * Returns a string with replication znodes and position of the replication log + * @param zkw + * @return aq string of replication znodes and log positions + */ + public static String getReplicationZnodesDump(ZooKeeperWatcher zkw) throws KeeperException { + StringBuilder sb = new StringBuilder(); + getReplicationZnodesDump(zkw, sb); + return sb.toString(); + } + private static void appendRSZnodes(ZooKeeperWatcher zkw, String znode, StringBuilder sb) throws KeeperException { List stack = new LinkedList(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java new file mode 100644 index 0000000..63af467 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.replication.*; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.HashMap; + +/** + * Provides information about the existing states of replication, replication peers and queues. + * + * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues + */ + +public class DumpReplicationQueues extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(DumpReplicationQueues.class.getName()); + + private static Configuration conf; + + private static ReplicationAdmin replicationAdmin; + private static String replicationZNode; + private static ClusterConnection connection; + private static Path oldLogDir, logDir, rootDir; + private static FileSystem fs; + + // although the tool is designed to be run on command line + // this api is provided for executing the tool through another app + public static void setConfigure(Configuration config) { + conf = config; + } + + /** + * Main + * + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + if (conf == null) conf = HBaseConfiguration.create(); + if (connection == null) connection = (ClusterConnection) ConnectionFactory.createConnection(conf); + if (replicationAdmin == null) replicationAdmin = new ReplicationAdmin(conf); + + int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + ZooKeeperWatcher zkw; + + // Our zk watcher + zkw = new ZooKeeperWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(), new WarnOnlyAbortable() , true); + + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName); + rootDir = FSUtils.getRootDir(conf); + fs = FileSystem.get(conf); + oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + + LOG.info("Our Quorum: " + zkw.getQuorum()); + + List> replicatedTables = replicationAdmin.listReplicated(); + if (replicatedTables.isEmpty()) { + LOG.info("No tables with a configured replication peer were found."); + } else { + LOG.info("Replicated Tables: " + replicatedTables); + } + + Map peerConfigs = replicationAdmin.listPeerConfigs(); + dumpPeersState(peerConfigs); + + if (dumpQueuesOrFix(zkw)) { + LOG.warn("Run hbck -fixReplication in order to remove the deleted replication queues"); + } + + zkw.close(); + return (0); + } + + private void dumpPeersState(Map peerConfigs) throws IOException, ReplicationException { + Map currentConf; + for (Map.Entry peer : peerConfigs.entrySet()) { + + LOG.info("Peer: " + peer.getKey()); + LOG.info(" " + "State: " + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED")); + LOG.info(" " + "Cluster Name: " + peer.getValue()); + currentConf = peer.getValue().getConfiguration(); + // Only show when we have a custom configuration for the peer + if (currentConf.size() > 1) { + LOG.info(" " + "Peer Configuration: " + currentConf); + } + LOG.info(" " + "Peer Table CFs: " + replicationAdmin.getPeerTableCFs(peer.getKey())); + } + } + + private boolean dumpQueuesOrFix(ZooKeeperWatcher zkw) throws Exception, IOException, ReplicationException { + ReplicationQueuesClient queuesClient; + ReplicationPeers replicationPeers; + ReplicationQueues replicationQueues; + ReplicationQueuesClientArguments replicationArgs = + new ReplicationQueuesClientArguments(conf, new WarnOnlyAbortable(), zkw); + Map> undeletedQueueIds = new HashMap>(); + + queuesClient = ReplicationFactory.getReplicationQueuesClient(replicationArgs); + queuesClient.init(); + replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs); + replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, queuesClient, connection); + replicationPeers.init(); + + // Scan every peer and every existing queue + Set peerIds = new HashSet(replicationPeers.getAllPeerIds()); + try { + List regionservers = queuesClient.getListOfReplicators(); + for (String regionserver : regionservers) { + List queueIds = queuesClient.getAllQueues(regionserver); + replicationQueues.init(regionserver); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + List wals = queuesClient.getLogsInQueue(regionserver, queueId); + LOG.info("Dumping replication queue info for RegionServer: [" + regionserver + "]"); + LOG.info(" PeerID: " + queueInfo.getPeerId()); + LOG.info(" Recovered: " + queueInfo.isQueueRecovered()); + LOG.info(" Dead RegionServers: " + queueInfo.getDeadRegionServers()); + LOG.info(" Number of WALs in replication queue: " + wals.size()); + LOG.info(" Total Size of WALs in HDFS: " + StringUtils.humanSize(getTotalWALSize(wals, regionserver))); + for (String wal : wals) { + long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal); + LOG.info(" Replication position for " + wal + ": " + (position > 0 ? position : "0 (not started)")); + } + + if (!peerIds.contains(queueInfo.getPeerId())) { + if (!undeletedQueueIds.containsKey(regionserver)) { + undeletedQueueIds.put(regionserver, new ArrayList()); + } + undeletedQueueIds.get(regionserver).add(queueId); + + String msg = " Undeleted replication queue found: " + + String.format("[removedPeerId=%s, regionserver=%s, queueId=%s, wals=%s, size=%s]", + queueInfo.getPeerId(), regionserver, queueId, wals.size(), + StringUtils.humanSize(getTotalWALSize(wals, regionserver))); + LOG.warn(msg); + } + } + } + } catch (KeeperException ke) { + throw new IOException(ke); + } + return undeletedQueueIds.size() > 0 ? true : false; + } + + /** + * return total size in bytes from a list of WALs + */ + private long getTotalWALSize(List wals, String server) { + int size = 0; + FileStatus fileStatus = new FileStatus(); + for (String wal : wals) { + try { + Path current = new Path(this.logDir, new Path(server + "/" + wal)); + Path old = new Path(this.oldLogDir, new Path(wal)); + if (this.fs.exists(current)) { + fileStatus = this.fs.getFileStatus(current); + } else if (this.fs.exists(old)) { + fileStatus = this.fs.getFileStatus(old); + } else { + LOG.warn("WAL " + wal + " couldn't be found, skipping"); + break; + } + } catch (IOException e) { + LOG.warn("Caught an exception while accessing: " + wal + ", ignoring WAL, see: " + e); + } + size += fileStatus.getLen(); + } + return size; + } + + private static class WarnOnlyAbortable implements Abortable { + @Override + public void abort(String why, Throwable e) { + LOG.warn("DumpReplicationQueue received abort, ignoring. Reason: " + why); + if (LOG.isDebugEnabled()) { + LOG.debug(e); + } + } + + @Override + public boolean isAborted() { + return false; + } + } +} -- 2.9.2