From c58e23556e37bbfc75c2b85969213a0b28db7132 Mon Sep 17 00:00:00 2001 From: jingyuntian Date: Fri, 30 Mar 2018 18:45:52 +0800 Subject: [PATCH] replicationMasterUI --- .../hbase/tmpl/master/MasterStatusTmpl.jamon | 59 ++++++++++ .../hbase/tmpl/master/RegionServerListTmpl.jamon | 88 +++++++++++++-- .../org/apache/hadoop/hbase/master/HMaster.java | 27 +++++ .../hbase/master/TestGetReplicationLoad.java | 119 +++++++++++++++++++++ 4 files changed, 284 insertions(+), 9 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon index 3091e18..85ca72e 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon @@ -31,8 +31,11 @@ AssignmentManager assignmentManager = null; <%import> java.util.*; java.io.IOException; +org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; org.apache.hadoop.hbase.client.RegionInfo; org.apache.hadoop.hbase.client.TableDescriptor; +org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +org.apache.hadoop.hbase.replication.ReplicationPeerDescription; org.apache.hadoop.hbase.HBaseConfiguration; org.apache.hadoop.hbase.HConstants; org.apache.hadoop.hbase.HTableDescriptor; @@ -259,6 +262,10 @@ AssignmentManager assignmentManager = master.getAssignmentManager(); +
+

Peers

+ <& peerConfigs &> +
<%if master.getAssignmentManager() != null %> <& AssignmentManagerStatusTmpl; assignmentManager=master.getAssignmentManager()&> @@ -564,3 +571,55 @@ AssignmentManager assignmentManager = master.getAssignmentManager(); + +<%def peerConfigs> +<%java> + List peers = null; + if (master.getReplicationPeerManager() != null) { + peers = master.getReplicationPeerManager().listPeers(null); + } + + + + + + + + + + + + + + +<%if (peers != null && peers.size() > 0)%> + <%for ReplicationPeerDescription peer : peers %> + <%java> + String peerId = peer.getPeerId(); + ReplicationPeerConfig peerConfig = peer.getPeerConfig(); + + + + + + + + + + + + + + + + +
Peer IdCluster KeyStateIsSerialBandwidthReplicateAllNamespacesExclude NamespacesTable CfsExclude Table Cfs
<% peerId %><% peerConfig.getClusterKey() %><% peer.isEnabled() ? "ENABLED" : "DISABLED" %><% peerConfig.isSerial() %><% peerConfig.getBandwidth() == 0? "UNLIMITED" : StringUtils.humanReadableInt(peerConfig.getBandwidth()) %><% peerConfig.replicateAllUserTables() %> + <% peerConfig.getNamespaces() == null ? "" : ReplicationPeerConfigUtil.convertToString(peerConfig.getNamespaces()).replaceAll(";", "; ") %> + + <% peerConfig.getExcludeNamespaces() == null ? "" : ReplicationPeerConfigUtil.convertToString(peerConfig.getExcludeNamespaces()).replaceAll(";", "; ") %> + + <% peerConfig.getTableCFsMap() == null ? "" : ReplicationPeerConfigUtil.convertToString(peerConfig.getTableCFsMap()).replaceAll(";", "; ") %> + + <% peerConfig.getExcludeTableCFsMap() == null ? "" : ReplicationPeerConfigUtil.convertToString(peerConfig.getExcludeTableCFsMap()).replaceAll(";", "; ") %> +
Total: <% (peers != null) ? peers.size() : 0 %>
+ diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon index f353d32..c6c7fc3 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon @@ -26,11 +26,14 @@ HMaster master; <%import> java.util.*; org.apache.hadoop.hbase.master.HMaster; + org.apache.hadoop.hbase.procedure2.util.StringUtils; + org.apache.hadoop.hbase.replication.ReplicationLoadSource; org.apache.hadoop.hbase.RegionMetrics; org.apache.hadoop.hbase.ServerMetrics; org.apache.hadoop.hbase.ServerName; org.apache.hadoop.hbase.Size; org.apache.hadoop.hbase.util.VersionInfo; + org.apache.hadoop.hbase.util.Pair; org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @@ -48,7 +51,8 @@ Arrays.sort(serverNames);
  • Memory
  • Requests
  • Storefiles
  • -
  • Compactions
  • +
  • Compactions
  • +
  • Replications
  • @@ -63,9 +67,12 @@ Arrays.sort(serverNames);
    <& storeStats; serverNames = serverNames; &>
    -
    +
    <& compactionStats; serverNames = serverNames; &>
    +
    + <& replicationStats; serverNames = serverNames; &> +
    @@ -111,7 +118,7 @@ Arrays.sort(serverNames); long startcode = serverName.getStartcode(); - <& serverNameLink; serverName=serverName; serverLoad = sl; &> + <& serverNameLink; serverName=serverName; &> <% new Date(startcode) %> <% TraditionalBinaryPrefix.long2String(lastContact, "s", 1) %> <% version %> @@ -158,7 +165,7 @@ for (ServerName serverName: serverNames) { } - <& serverNameLink; serverName=serverName; serverLoad = sl; &> + <& serverNameLink; serverName=serverName; &> <% TraditionalBinaryPrefix.long2String((long) sl.getUsedHeapSize().get(Size.Unit.MEGABYTE) * TraditionalBinaryPrefix.MEGA.value, "B", 1) %> <% TraditionalBinaryPrefix.long2String((long) sl.getMaxHeapSize().get(Size.Unit.MEGABYTE) @@ -206,7 +213,7 @@ if (sl != null) { } -<& serverNameLink; serverName=serverName; serverLoad = sl; &> +<& serverNameLink; serverName=serverName; &> <% sl.getRequestCountPerSecond() %> <% readRequestCount %> <% filteredReadRequestCount %> @@ -259,7 +266,7 @@ if (sl != null) { } -<& serverNameLink; serverName=serverName; serverLoad = sl; &> +<& serverNameLink; serverName=serverName; &> <% storeCount %> <% storeFileCount %> <% TraditionalBinaryPrefix.long2String( @@ -312,7 +319,7 @@ if (totalCompactingCells > 0) { } -<& serverNameLink; serverName=serverName; serverLoad = sl; &> +<& serverNameLink; serverName=serverName; &> <% totalCompactingCells %> <% totalCompactedCells %> <% totalCompactingCells - totalCompactedCells %> @@ -329,11 +336,74 @@ if (totalCompactingCells > 0) { +<%def replicationStats> +<%args> + ServerName [] serverNames; + +<%java> + HashMap>> replicationLoadSourceMap + = master.getReplicationLoad(serverNames); + List peers = null; + if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0){ + peers = new ArrayList<>(replicationLoadSourceMap.keySet()); + Collections.sort(peers); + } + + +<%if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0) %> + +
    + +
    + <%java> + active = "active"; + for (String peer : peers){ + +
    + + + + + + + + + <%for Pair pair: replicationLoadSourceMap.get(peer) %> + + + + + + + +
    ServerAgeOfLastShippedOpSizeOfLogQueueReplicationLag
    <& serverNameLink; serverName=pair.getFirst(); &><% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %><% pair.getSecond().getSizeOfLogQueue() %><% StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %>
    +
    + <%java> + active = ""; + } + +
    +
    +<%else> +

    No Peers Metrics

    + + + + <%def serverNameLink> <%args> ServerName serverName; - ServerMetrics serverLoad; <%java> int infoPort = master.getRegionServerInfoPort(serverName); @@ -352,7 +422,7 @@ if (totalCompactingCells > 0) { ServerName serverName; - <& serverNameLink; serverName=serverName; serverLoad = null; &> + <& serverNameLink; serverName=serverName; &> diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 9dd685d..249849e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -169,6 +169,7 @@ import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationLoadSource; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationUtils; @@ -3620,6 +3621,32 @@ public class HMaster extends HRegionServer implements MasterServices { return replicationPeerManager; } + public HashMap>> + getReplicationLoad(ServerName[] serverNames) { + List peerList = this.getReplicationPeerManager().listPeers(null); + if (peerList == null) { + return null; + } + HashMap>> replicationLoadSourceMap = + new HashMap<>(peerList.size()); + peerList.stream() + .forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList())); + for (ServerName serverName : serverNames) { + List replicationLoadSources = + getServerManager().getLoad(serverName).getReplicationLoadSourceList(); + for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) { + replicationLoadSourceMap.get(replicationLoadSource.getPeerID()) + .add(new Pair<>(serverName, replicationLoadSource)); + } + } + for (List> loads : replicationLoadSourceMap.values()) { + if (loads.size() > 0) { + loads.sort(Comparator.comparingLong(load -> (-1) * load.getSecond().getReplicationLag())); + } + } + return replicationLoadSourceMap; + } + /** * This method modifies the master's configuration in order to inject replication-related features */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java new file mode 100644 index 0000000..836f4ff --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationLoadSource; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestGetReplicationLoad { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestGetReplicationLoad.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class); + + private static MiniHBaseCluster cluster; + private static HMaster master; + private static HBaseTestingUtility TEST_UTIL; + + public static class MyMaster extends HMaster { + public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException { + super(conf); + } + + @Override + protected void tryRegionServerReport(long reportStartTime, long reportEndTime) { + // do nothing + } + } + + @BeforeClass + public static void startCluster() throws Exception { + LOG.info("Starting cluster"); + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.startMiniCluster(1, 1, 1, null, TestMasterMetrics.MyMaster.class, null); + cluster = TEST_UTIL.getHBaseCluster(); + LOG.info("Waiting for active/ready master"); + cluster.waitForActiveAndReadyMaster(); + master = cluster.getMaster(); + } + + @AfterClass + public static void after() throws Exception { + if (TEST_UTIL != null) { + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testGetReplicationMetrics() throws Exception { + String peer1 = "test1", peer2 = "test2"; + long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4; + int sizeOfLogQueue = 5; + RegionServerStatusProtos.RegionServerReportRequest.Builder request = + RegionServerStatusProtos.RegionServerReportRequest.newBuilder(); + ServerName serverName = cluster.getMaster(0).getServerName(); + request.setServer(ProtobufUtil.toServerName(serverName)); + ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource + .newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp) + .setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp) + .setSizeOfLogQueue(sizeOfLogQueue).build(); + ClusterStatusProtos.ReplicationLoadSource rload2 = + ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2) + .setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1) + .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1) + .setSizeOfLogQueue(sizeOfLogQueue + 1).build(); + ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() + .addReplLoadSource(rload1).addReplLoadSource(rload2).build(); + request.setLoad(sl); + master.getReplicationPeerManager().addPeer(peer1, + ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); + master.getReplicationPeerManager().addPeer(peer2, + ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); + master.getMasterRpcServices().regionServerReport(null, request.build()); + HashMap>> replicationLoad = + master.getReplicationLoad(new ServerName[] { serverName }); + assertEquals("peer size ", 2, replicationLoad.size()); + assertEquals("load size ", 1, replicationLoad.get(peer1).size()); + assertEquals("log queue size of peer1", sizeOfLogQueue, + replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue()); + assertEquals("replication lag of peer2", replicationLag + 1, + replicationLoad.get(peer2).get(0).getSecond().getReplicationLag()); + master.stopMaster(); + } +} -- 2.7.4