diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationInfoTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationInfoTool.java new file mode 100644 index 0000000..bac0a3c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationInfoTool.java @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import com.google.protobuf.Message; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationWALReaderManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.KeeperException; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ReplicationInfoTool extends Configured implements Tool, Abortable { + private static final Log LOG = LogFactory.getLog(ReplicationInfoTool.class); + + private interface PeerVisitor { + void visitPeer(String peerId, ZooKeeperProtos.ReplicationPeer peer, + ZooKeeperProtos.ReplicationState state) throws IOException; + } + + private interface WalVisitor { + void visitWal(String peerId, String rsName, String walName, + ZooKeeperProtos.ReplicationHLogPosition state) throws IOException; + } + + private interface HFileVisitor { + void visitHFile(String peerId, String hfileName) throws IOException; + } + + // TODO: use ReplicationQueueClient & co + public static class ReplicationZkInfo extends ReplicationStateZKBase { + private final ThreadPoolExecutor threadPool; + + public ReplicationZkInfo(final ZooKeeperWatcher zkw, final Abortable abortable) { + this(zkw, abortable, + new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue(), + new RejectedExecutionHandler() { + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { + runnable.run(); + } + })); + } + + public ReplicationZkInfo(final ZooKeeperWatcher zkw, final Abortable abortable, + final ThreadPoolExecutor threadPool) { + super(zkw, zkw.getConfiguration(), abortable); + this.threadPool = threadPool; + } + + public void visitPeers(final PeerVisitor visitor) throws IOException, KeeperException { + for (String peerId : ZKUtil.listChildrenNoWatch(zookeeper, peersZNode)) { + final String peerZnode = ZKUtil.joinZNode(peersZNode, peerId); + final String peerStateZnode = ZKUtil.joinZNode(peerZnode, peerStateNodeName); + try { + ZooKeeperProtos.ReplicationPeer peerData = parseZkData(peerZnode, + ZooKeeperProtos.ReplicationPeer.newBuilder()).build(); + + ZooKeeperProtos.ReplicationState stateData = parseZkData(peerStateZnode, + ZooKeeperProtos.ReplicationState.newBuilder()).build(); + + visitor.visitPeer(peerId, peerData, stateData); + } catch (IOException ipbe) { + LOG.warn("Got Exception while parsing peer: " + peerId, ipbe); + } + } + } + + public void visitWals(final WalVisitor visitor) throws IOException, KeeperException { + final List servers = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode); + if (servers == null || servers.size() == 0) return; + + ArrayList> futures = new ArrayList>(servers.size()); + for (final String rsName: servers) { + futures.add(threadPool.submit(new Callable() { + public Void call() throws IOException, KeeperException { + visitRsWals(rsName, visitor); + return null; + } + })); + } + + for (int i = 0; i < futures.size(); ++i) { + try { + futures.get(i).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } + } + + private void visitRsWals(final String regionServerName, final WalVisitor visitor) + throws IOException, KeeperException { + final String regionServerZnode = ZKUtil.joinZNode(queuesZNode, regionServerName); + for (String peerId : ZKUtil.listChildrenNoWatch(zookeeper, regionServerZnode)) { + final String peerIdZnode = ZKUtil.joinZNode(regionServerZnode, peerId); + for (String walName : ZKUtil.listChildrenNoWatch(zookeeper, peerIdZnode)) { + final String walZnode = ZKUtil.joinZNode(peerIdZnode, walName); + try { + ZooKeeperProtos.ReplicationHLogPosition walPosition = parseZkData(walZnode, + ZooKeeperProtos.ReplicationHLogPosition.newBuilder()).build(); + visitor.visitWal(peerId, regionServerName, walName, walPosition); + } catch (IOException ipbe) { + LOG.warn("Got Exception while parsing wal: " + peerId, ipbe); + } + } + } + } + + public void visitHFiles(final HFileVisitor visitor) throws IOException, KeeperException { + for (String hfile : ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode)) { + final String hfileZnode = ZKUtil.joinZNode(hfileRefsZNode, hfile); + for (String peerId : ZKUtil.listChildrenNoWatch(zookeeper, hfileZnode)) { + visitor.visitHFile(peerId, hfile); + } + } + } + + private TBuilder parseZkData( + final String znode, final TBuilder builder) throws IOException, KeeperException { + byte[] data; + try { + data = ZKUtil.getData(zookeeper, znode); + } catch (InterruptedException e) { + zookeeper.interruptedException(e); + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + + int pblen = ProtobufUtil.lengthOfPBMagic(); + ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen); + return builder; + } + } + + public static class WalInfo { + private static final Pattern PATTERN_CTIME = Pattern.compile("\\.(\\d+)"); + + private final String serverName; + private final String walName; + + private boolean walClosed; + private long walCreationTime; + private long walSize; + + private final Map peerState = + new ConcurrentHashMap(); + + public WalInfo(String serverName, String walName) { + this.serverName = serverName; + this.walName = walName; + } + + void addPeer(final PeerInfo peer, final ZooKeeperProtos.ReplicationHLogPosition walPos) { + peerState.put(peer, walPos); + } + + void setStatus(final Configuration conf, final FileStatus status) { + // try to find the creation time from name + Matcher m = PATTERN_CTIME.matcher(walName); + this.walCreationTime = m.find() ? Long.parseLong(m.group(1)) : status.getModificationTime(); + // try to guess closed state and file size + this.walClosed = isClosed(conf, status); + this.walSize = status.getLen(); + } + + private static boolean isClosed(final Configuration conf, final FileStatus status) { + try { + FileSystem fs = status.getPath().getFileSystem(conf); + if (fs instanceof DistributedFileSystem) { + return ((DistributedFileSystem)fs).isFileClosed(status.getPath()); + } + } catch (IOException e) { + LOG.warn("unable to get the close state of " + status.getPath(), e); + } + // guess + return status.getLen() > 0; + } + + public String getWalName() { + return walName; + } + + public String getWalServer() { + return serverName; + } + + public long getCreationTimestamp() { + return walCreationTime; + } + + public boolean isWalClosed() { + return walClosed; + } + + public long getWalSize() { + return walSize; + } + + public Collection getPeers() { + return peerState.keySet(); + } + + public long getPeerOffset(final PeerInfo peer) { + ZooKeeperProtos.ReplicationHLogPosition walPos = peerState.get(peer); + return walPos.getPosition(); + } + + public float getPeerProgress(final PeerInfo peer) { + // TODO: Assume that the trailer is 8 byte (4 int + 4 magic) + return getPeerOffset(peer) / (float)Math.max(walSize, walSize - 8); + } + } + + public static class PeerInfo { + private final ZooKeeperProtos.ReplicationState state; + private final ZooKeeperProtos.ReplicationPeer peer; + private final String peerId; + + private final ConcurrentLinkedQueue wals = new ConcurrentLinkedQueue(); + + public PeerInfo(String peerId, ZooKeeperProtos.ReplicationPeer peer, + ZooKeeperProtos.ReplicationState state) { + this.peerId = peerId; + this.peer = peer; + this.state = state; + } + + void addWal(WalInfo wal) { + wals.add(wal); + } + + public String getPeerId() { + return peerId; + } + + public String getClusterKey() { + return peer.getClusterkey(); + } + + public boolean isEnabled() { + switch (state.getState()) { + case ENABLED: return true; + case DISABLED: return false; + } + return false; + } + + public Collection getWals() { + return wals; + } + + public String getPeerConfig() { + return peer.toString(); + } + } + + public static class ReplicationWalProgress { + private final List peers; + private final List wals; + + public ReplicationWalProgress(List peers, List wals) { + this.peers = peers; + this.wals = wals; + Collections.sort(this.wals, new Comparator() { + public int compare(WalInfo o1, WalInfo o2) { + return Long.compare(o1.getCreationTimestamp(), o2.getCreationTimestamp()); + } + }); + } + + public List getPeers() { + return peers; + } + + public List getWals() { + return wals; + } + } + + public static ReplicationWalProgress getWalProgress(final FileSystem fs, final Path rootDir, + final ZooKeeperWatcher zkw, final Abortable abortable) throws IOException, KeeperException { + ThreadPoolExecutor threadPool = getThreadPoolExecutor(32, "ReplicationInfo"); + ReplicationZkInfo replicationInfo = new ReplicationZkInfo(zkw, abortable, threadPool); + + final Map peers = new HashMap(); + replicationInfo.visitPeers(new PeerVisitor() { + @Override + public void visitPeer(String peerId, ZooKeeperProtos.ReplicationPeer peer, + ZooKeeperProtos.ReplicationState state) throws IOException { + peers.put(peerId, new PeerInfo(peerId, peer, state)); + } + }); + + final Map wals = new ConcurrentHashMap(); + replicationInfo.visitWals(new WalVisitor() { + @Override + public void visitWal(String peerId, String rsName, String walName, + ZooKeeperProtos.ReplicationHLogPosition state) throws IOException { + PeerInfo peer = peers.get(peerId); + if (peer == null) { + LOG.error(String.format("found a wal without peer. peerId=%s rsName=%s wal=%s", + peerId, rsName, walName)); + return; + } + + // get the wal info + String walKey = rsName + walName; + WalInfo wal = wals.get(walKey); + if (wal == null) { + wal = new WalInfo(rsName, walName); + WalInfo oldWal = wals.putIfAbsent(walKey, wal); + if (oldWal != null) wal = oldWal; + wal.setStatus(zkw.getConfiguration(), + getWalFileStatus(fs, rootDir, rsName, walName)); + } + + // add the peer and position to the wal + wal.addPeer(peer, state); + // add the wal to the peer + peer.addWal(wal); + } + }); + + return new ReplicationWalProgress(new ArrayList(peers.values()), + new ArrayList(wals.values())); + } + + private static FileStatus getWalFileStatus(final FileSystem fs, final Path rootDir, + final String serverName, final String walName) throws IOException { + final Path[] paths = ReplicationWALReaderManager.getRsWalPaths(rootDir, serverName, walName); + for (int i = 0; i < paths.length; ++i) { + try { + FileStatus status = fs.getFileStatus(paths[i]); + if (status != null) return status; + } catch (FileNotFoundException e) { + continue; + } + } + + final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + try { + return fs.getFileStatus(new Path(oldLogDir, walName)); + } catch (FileNotFoundException e) { + LOG.warn(String.format("unable to find wal=%s server=%s", walName, serverName)); + return null; + } + } + + private static ThreadPoolExecutor getThreadPoolExecutor(int poolSize, String name) { + return Threads.getBoundedCachedThreadPool(poolSize, 60L, TimeUnit.SECONDS, + Threads.newDaemonThreadFactory(name, new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.warn("Failed to execute thread " + t.getName(), e); + } + })); + } + + @Override + public void abort(String why, Throwable e) { + LOG.fatal("abort " + why, e); + System.exit(1); + } + + @Override + public boolean isAborted() { + return false; + } + + public int run(String[] args) throws IOException, KeeperException { + final FileSystem fs = FileSystem.get(getConf()); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(getConf(), "ReplicationInfo", this); + final Path rootDir = FSUtils.getRootDir(getConf()); + + final ReplicationWalProgress walProgress = getWalProgress(fs, rootDir, zkw, this); + + for (PeerInfo peer: walProgress.getPeers()) { + System.out.printf("PeerId %s %s (%d wals) %s %n", peer.getPeerId(), + peer.isEnabled() ? "ENABLED" : "DISABLED", + peer.getWals().size(), + peer.getPeerConfig()); + } + + for (WalInfo wal: walProgress.getWals()) { + System.out.printf("WAL %s server %s (%s)%n", + wal.getWalName(), wal.getWalServer(), + StringUtils.humanReadableInt(wal.getWalSize())); + for (PeerInfo peer: walProgress.getPeers()) { + System.out.printf(" - PeerId %s progress %s (%.3f%%) %n", + peer.getPeerId(), + StringUtils.humanReadableInt(wal.getPeerOffset(peer)), + wal.getPeerProgress(peer)); + } + } + + return 0; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(HBaseConfiguration.create(), new ReplicationInfoTool(), args); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index a25c80b..02d9902 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -838,13 +838,8 @@ public class ReplicationSource extends Thread LOG.info("NB dead servers : " + deadRegionServers.size()); final Path rootDir = FSUtils.getRootDir(conf); for (String curDeadServerName : deadRegionServers) { - final Path deadRsDirectory = new Path(rootDir, - DefaultWALProvider.getWALDirectoryName(curDeadServerName)); - Path[] locs = new Path[] { - new Path(deadRsDirectory, currentPath.getName()), - new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT), - currentPath.getName()), - }; + final Path[] locs = ReplicationWALReaderManager.getRsWalPaths(rootDir, + curDeadServerName, currentPath.getName()); for (Path possibleLogLocation : locs) { LOG.info("Possible location " + possibleLogLocation.toUri().toString()); if (manager.getFs().exists(possibleLogLocation)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java index b63f66b..341e8b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationWALReaderManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; import java.io.IOException; @@ -142,4 +143,12 @@ public class ReplicationWALReaderManager { } } + public static Path[] getRsWalPaths(final Path rootDir, final String serverName, + final String walName) { + final Path rsDirectory = new Path(rootDir, DefaultWALProvider.getWALDirectoryName(serverName)); + return new Path[] { + new Path(rsDirectory, walName), + new Path(rsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT), walName), + }; + } } diff --git a/hbase-server/src/main/resources/hbase-webapps/master/replication.jsp b/hbase-server/src/main/resources/hbase-webapps/master/replication.jsp new file mode 100644 index 0000000..127f2ce --- /dev/null +++ b/hbase-server/src/main/resources/hbase-webapps/master/replication.jsp @@ -0,0 +1,146 @@ +<%-- +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--%> +<%@ page contentType="text/html;charset=UTF-8" + import="static org.apache.commons.lang.StringEscapeUtils.escapeXml" + import="java.util.Date" + import="org.apache.hadoop.fs.FileSystem" + import="org.apache.hadoop.fs.Path" + import="org.apache.hadoop.hbase.HBaseConfiguration" + import="org.apache.hadoop.hbase.master.HMaster" + import="org.apache.hadoop.hbase.replication.ReplicationInfoTool" + import="org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher" + import="org.apache.hadoop.util.StringUtils" + +%> +<% + HMaster master = (HMaster)getServletContext().getAttribute(HMaster.MASTER); + FileSystem fs = master.getMasterFileSystem().getFileSystem(); + ZooKeeperWatcher zkw = master.getZooKeeper(); + Path rootDir = master.getMasterFileSystem().getRootDir(); + + ReplicationInfoTool.ReplicationWalProgress walProgress = + ReplicationInfoTool.getWalProgress(fs, rootDir, zkw, master); +%> + + + + + + HBase Master Procedures: <%= master.getServerName() %> + + + + + + + + + + +
+
+ +
+

Peers

+ + + + + + + + <% for (ReplicationInfoTool.PeerInfo peer: walProgress.getPeers()) { %> + + + + + + + <% } %> +
IdStateWalsConfig
<%= peer.getPeerId() %> <%= peer.isEnabled() ? "ENABLED" : "DISABLED" %> <%= peer.getWals().size() %> <%= peer.getPeerConfig() %>
+ +

Wals

+

NOTE: The progress percentage is an estimate based on offset and wal size.

+ + + + + + + + + <% for (ReplicationInfoTool.WalInfo wal: walProgress.getWals()) { %> + <% for (ReplicationInfoTool.PeerInfo peer: walProgress.getPeers()) { %> + + + + + + + + <% } %> + <% } %> +
Creation TimeWALWAL SizePeer IdOffset/Progress
<%= new Date(wal.getCreationTimestamp()) %><%= escapeXml(wal.getWalName()) %> + <%= StringUtils.humanReadableInt(wal.getWalSize()) %> + <% if (!wal.isWalClosed()) { %> + (WAL NOT CLOSED) + <% } %> + <%= peer.getPeerId() %> + <%= StringUtils.humanReadableInt(wal.getPeerOffset(peer)) %> + <% if (wal.getWalSize() > 0) { %> + <%= String.format("(~%.2f%%)", wal.getPeerProgress(peer) * 100.0) %> + <% } %> +
+
+ + + + + +