From b4f9fb450cd088c240e84f22ec6a481f8a67ca42 Mon Sep 17 00:00:00 2001 From: Esteban Gutierrez Date: Mon, 22 Aug 2016 19:48:10 -0700 Subject: [PATCH] HBASE-16450 Shell tool to dump replication queues New tool to dump existing replication peers, configurations and queues when using HBase Replication. The tool provides two flags: --distributed This flag will poll each RS for information about the replication queues being processed on this RS. By default this is not enabled and the information about the replication queues and configuration will be obtained from ZooKeeper. --hdfs When --distributed is used, this flag will attempt to calculate the total size of the WAL files used by the replication queues. Since its possible that multiple peers can be configured this value can be overestimated. Signed-off-by: Matteo Bertozzi --- .../hbase/replication/ReplicationQueuesZKImpl.java | 2 +- .../org/apache/hadoop/hbase/zookeeper/ZKUtil.java | 11 + .../regionserver/DumpReplicationQueues.java | 355 +++++++++++++++++++++ 3 files changed, 367 insertions(+), 1 deletion(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 5c6fe4e..2659fa6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -151,7 +151,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R return ZKUtil.parseWALPositionFrom(bytes); } catch (DeserializationException de) { LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename - + "znode content, continuing."); + + " znode content, continuing."); } // if we can not parse the position, start at the beginning of the wal file // again diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 2dfc8d8..1ea6d62 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -1806,6 +1806,17 @@ public class ZKUtil { } } + /** + * Returns a string with replication znodes and position of the replication log + * @param zkw + * @return aq string of replication znodes and log positions + */ + public static String getReplicationZnodesDump(ZooKeeperWatcher zkw) throws KeeperException { + StringBuilder sb = new StringBuilder(); + getReplicationZnodesDump(zkw, sb); + return sb.toString(); + } + private static void appendRSZnodes(ZooKeeperWatcher zkw, String znode, StringBuilder sb) throws KeeperException { List stack = new LinkedList(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java new file mode 100644 index 0000000..bf38d6f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.io.WALLink; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.replication.*; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.KeeperException; +import org.mortbay.util.IO; + +import java.io.IOException; +import java.util.*; + +/** + * Provides information about the existing states of replication, replication peers and queues. + * + * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args] + * Arguments: --distributed Polls each RS to dump information about the queue + * --hdfs Reports HDFS usage by the replication queues (note: can be overestimated). + */ + +public class DumpReplicationQueues extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(DumpReplicationQueues.class.getName()); + + static class DumpOptions { + boolean hdfs = false; + boolean distributed = false; + + public DumpOptions() { + } + + public DumpOptions(DumpOptions that) { + this.hdfs = that.hdfs; + this.distributed = that.distributed; + } + + boolean isHdfs () { + return hdfs; + } + + boolean isDistributed() { + return distributed; + } + + void setHdfs (boolean hdfs) { + this.hdfs = hdfs; + } + + void setDistributed(boolean distributed) { + this.distributed = distributed; + } + } + + static DumpOptions parseOpts(Queue args) { + DumpOptions opts = new DumpOptions(); + + String cmd = null; + while ((cmd = args.poll()) != null) { + if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) { + // place item back onto queue so that caller knows parsing was incomplete + args.add(cmd); + break; + } + final String hdfs = "--hdfs"; + if (cmd.equals(hdfs)) { + opts.setHdfs(true); + continue; + } + final String distributed = "--distributed"; + if (cmd.equals(distributed)) { + opts.setDistributed(true); + continue; + } else { + printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); + } + // check that --distributed is present when --hdfs is in the arguments + if (!opts.isDistributed() && opts.isHdfs()) { + printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1); + } + } + return opts; + } + + /** + * Main + * + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + + int errCode = -1; + LinkedList argv = new LinkedList(); + argv.addAll(Arrays.asList(args)); + DumpOptions opts = parseOpts(argv); + + // args remaining, print help and exit + if (!argv.isEmpty()) { + errCode = 0; + printUsage(); + return errCode; + } + return dumpReplicationQueues(opts); + } + + protected void printUsage() { + printUsage(this.getClass().getName(), null); + } + + protected static void printUsage(final String message) { + printUsage(DumpReplicationQueues.class.getName(), message); + } + + protected static void printUsage(final String className, final String message) { + if (message != null && message.length() > 0) { + System.err.println(message); + } + System.err.println("Usage: java " + className + " \\"); + System.err.println(" [-D]*"); + System.err.println(); + System.err.println("General Options:"); + System.err.println(" distributed Poll each RS and print its own replication queue. " + + "Default only polls ZooKeeper"); + System.err.println(" hdfs Use HDFS to calculate usage of WALs by replication. It could be overestimated" + + " if replicating to multiple peers. --distributed flag is also needed."); + } + + protected static void printUsageAndExit(final String message, final int exitCode) { + printUsage(message); + System.exit(exitCode); + } + + private int dumpReplicationQueues(DumpOptions opts) throws Exception { + + Configuration conf = getConf(); + HBaseAdmin.checkHBaseAvailable(conf); + ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf); + ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf); + long deleted = 0; + + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(), + new WarnOnlyAbortable(), true); + + try { + // Our zk watcher + LOG.info("Our Quorum: " + zkw.getQuorum()); + List> replicatedTables = replicationAdmin.listReplicated(); + if (replicatedTables.isEmpty()) { + LOG.info("No tables with a configured replication peer were found."); + return(0); + } else { + LOG.info("Replicated Tables: " + replicatedTables); + } + + Map peerConfigs = replicationAdmin.listPeerConfigs(); + + if (peerConfigs.isEmpty()) { + LOG.info("Replication is enabled but no peer configuration was found."); + } + + System.out.println("Dumping replication peers and configurations:"); + System.out.println(dumpPeersState(replicationAdmin, peerConfigs)); + + if (opts.isDistributed()) { + LOG.info("Found [--distributed], will poll each RegionServer."); + System.out.println(dumpQueues(connection, zkw, opts.isHdfs(), deleted)); + if (deleted > 0) { + LOG.warn("Found " + deleted +" deleted queues" + + ", run hbck -fixReplication in order to remove the deleted replication queues"); + } + } else { + // use ZK instead + System.out.print("Dumping replication znodes via ZooKeeper:"); + System.out.println(ZKUtil.getReplicationZnodesDump(zkw)); + } + return (0); + } catch (IOException e) { + return (-1); + } finally { + zkw.close(); + } + } + + public String dumpPeersState(ReplicationAdmin replicationAdmin, + Map peerConfigs) throws Exception { + Map currentConf; + StringBuilder sb = new StringBuilder(); + for (Map.Entry peer : peerConfigs.entrySet()) { + try { + sb.append("Peer: " + peer.getKey() + "\n"); + sb.append(" " + "State: " + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n"); + sb.append(" " + "Cluster Name: " + peer.getValue() + "\n"); + currentConf = peer.getValue().getConfiguration(); + // Only show when we have a custom configuration for the peer + if (currentConf.size() > 1) { + sb.append(" " + "Peer Configuration: " + currentConf + "\n"); + } + sb.append(" " + "Peer Table CFs: " + replicationAdmin.getPeerTableCFs(peer.getKey()) + "\n"); + } catch (ReplicationException re) { + sb.append("Got an exception while invoking ReplicationAdmin: " + re + "\n"); + } + } + return sb.toString(); + } + + public String dumpQueues(ClusterConnection connection, ZooKeeperWatcher zkw, boolean hdfs, long deleted) + throws Exception { + ReplicationQueuesClient queuesClient; + ReplicationPeers replicationPeers; + ReplicationQueues replicationQueues; + + StringBuilder sb = new StringBuilder(); + + queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, getConf(), connection); + queuesClient.init(); + replicationQueues = ReplicationFactory.getReplicationQueues(zkw, getConf(), connection); + replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queuesClient, connection); + replicationPeers.init(); + + // Loops each peer on each RS and dumps the queues + + Set peerIds = new HashSet(replicationPeers.getAllPeerIds()); + try { + List regionservers = queuesClient.getListOfReplicators(); + for (String regionserver : regionservers) { + List queueIds = queuesClient.getAllQueues(regionserver); + replicationQueues.init(regionserver); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + List wals = queuesClient.getLogsInQueue(regionserver, queueId); + if (!peerIds.contains(queueInfo.getPeerId())) { + deleted++; + sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs)); + } else { + sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs)); + } + } + } + } catch (KeeperException ke) { + throw new IOException(ke); + } + return sb.toString(); + } + + private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo, + String queueId, List wals, boolean isDeleted, boolean hdfs) throws Exception { + + StringBuilder sb = new StringBuilder(); + + List deadServers ; + + sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n"); + sb.append(" PeerID: " + queueInfo.getPeerId() + "\n"); + sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n"); + deadServers = queueInfo.getDeadRegionServers(); + if (deadServers.isEmpty()) { + sb.append(" No dead RegionServers found in this queue." + "\n"); + } else { + sb.append(" Dead RegionServers: " + deadServers + "\n"); + } + sb.append(" Was deleted: " + isDeleted + "\n"); + sb.append(" Number of WALs in replication queue: " + wals.size() + "\n"); + for (String wal : wals) { + long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal); + sb.append(" Replication position for " + wal + ": " + (position > 0 ? position : "0" + + " (not started or nothing to replicate)") + "\n"); + } + + if (hdfs) { + FileSystem fs = FileSystem.get(getConf()); + sb.append(" Total size of WALs on HDFS for this queue: " + + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n"); + } + return sb.toString(); + } + /** + * return total size in bytes from a list of WALs + */ + private long getTotalWALSize(FileSystem fs, List wals, String server) throws IOException { + int size = 0; + FileStatus fileStatus; + + for (String wal : wals) { + try { + fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs); + } catch (IOException e) { + LOG.warn("WAL " + wal + " couldn't be found, skipping"); + break; + } + size += fileStatus.getLen(); + } + return size; + } + + private static class WarnOnlyAbortable implements Abortable { + @Override + public void abort(String why, Throwable e) { + LOG.warn("DumpReplicationQueue received abort, ignoring. Reason: " + why); + if (LOG.isDebugEnabled()) { + LOG.debug(e); + } + } + + @Override + public boolean isAborted() { + return false; + } + } + + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + LOG.error(errorMsg); + } + } +} -- 2.9.2