From 498cc8a0b3ce4990d3a285eb893d000a6a6b326e Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 18 Jan 2019 14:58:34 -0800 Subject: [PATCH] HBASE-21680 Port HBASE-20194 (Basic Replication WebUI - Master) and HBASE-20193 (Basic Replication Web UI - Regionserver) to branch-1 HBASE-20193 Basic Replication Web UI - Regionserver HBASE-20194 Basic Replication WebUI - Master --- .../hbase/tmpl/master/MasterStatusTmpl.jamon | 42 ++++++ .../tmpl/master/RegionServerListTmpl.jamon | 86 +++++++++-- .../tmpl/regionserver/RSStatusTmpl.jamon | 4 + .../regionserver/ReplicationStatusTmpl.jamon | 105 ++++++++++++++ .../apache/hadoop/hbase/master/HMaster.java | 21 +++ .../hbase/regionserver/HRegionServer.java | 16 +++ .../regionserver/ReplicationService.java | 2 +- .../ReplicationSourceService.java | 6 + .../regionserver/MetricsSource.java | 37 +++-- .../regionserver/ReplicationLoad.java | 33 +++-- .../regionserver/ReplicationSource.java | 37 +++++ .../ReplicationSourceInterface.java | 6 + .../ReplicationSourceWALReaderThread.java | 11 +- .../regionserver/ReplicationStatus.java | 135 ++++++++++++++++++ .../hbase/master/TestGetReplicationLoad.java | 133 +++++++++++++++++ .../replication/ReplicationSourceDummy.java | 8 ++ .../replication/TestReplicationBase.java | 1 + .../TestReplicationMetricsforUI.java | 105 ++++++++++++++ 18 files changed, 750 insertions(+), 38 deletions(-) create mode 100644 hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon index efff6f70b8..f365221c51 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon @@ -43,6 +43,7 @@ org.apache.hadoop.hbase.ServerLoad; org.apache.hadoop.hbase.ServerName; org.apache.hadoop.hbase.client.Admin; org.apache.hadoop.hbase.client.HConnectionManager; +org.apache.hadoop.hbase.client.replication.ReplicationAdmin; org.apache.hadoop.hbase.HRegionInfo; org.apache.hadoop.hbase.master.RegionState; org.apache.hadoop.hbase.HTableDescriptor; @@ -52,6 +53,9 @@ org.apache.hadoop.hbase.tool.Canary; org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; org.apache.hadoop.hbase.master.DeadServer; org.apache.hadoop.hbase.protobuf.ProtobufUtil; +org.apache.hadoop.hbase.replication.ReplicationPeer; +org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +org.apache.hadoop.hbase.replication.ReplicationSerDeHelper; org.apache.hadoop.hbase.security.visibility.VisibilityConstants; org.apache.hadoop.hbase.security.access.AccessControlLists; org.apache.hadoop.hbase.quotas.QuotaUtil; @@ -232,6 +236,10 @@ AssignmentManager assignmentManager = master.getAssignmentManager(); +
+

Peers

+ <& peerConfigs &> +
<%if master.getAssignmentManager() != null %> <& AssignmentManagerStatusTmpl; assignmentManager=master.getAssignmentManager()&> @@ -554,3 +562,37 @@ AssignmentManager assignmentManager = master.getAssignmentManager(); + +<%def peerConfigs> +<%java> + Map peers = null; + try (ReplicationAdmin admin = new ReplicationAdmin(master.getConfiguration())) { + peers = admin.listPeerConfigs(); + } + + + + + + + + +<%if (peers != null && peers.size() > 0)%> + <%for Map.Entry peer : peers.entrySet() %> + <%java> + String peerId = peer.getKey(); + ReplicationPeerConfig peerConfig = peer.getValue(); + + + + + + + + + + +
Peer IdCluster KeyBandwidthTable Cfs
<% peerId %><% peerConfig.getClusterKey() %><% peerConfig.getBandwidth() == 0? "UNLIMITED" : StringUtils.humanReadableInt(peerConfig.getBandwidth()) %> + <% peerConfig.getTableCFsMap() == null ? "" : ReplicationSerDeHelper.convertToString(peerConfig.getTableCFsMap()).replaceAll(";", "; ") %> +
Total: <% (peers != null) ? peers.size() : 0 %>
+ diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon index 29c0f77fa1..8116b289f4 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/RegionServerListTmpl.jamon @@ -26,6 +26,8 @@ HMaster master; <%import> java.util.*; org.apache.hadoop.hbase.master.HMaster; + org.apache.hadoop.hbase.procedure2.util.StringUtils; + org.apache.hadoop.hbase.replication.ReplicationLoadSource; org.apache.hadoop.hbase.ServerLoad; org.apache.hadoop.hbase.ServerName; org.apache.hadoop.hbase.client.HBaseAdmin; @@ -33,6 +35,7 @@ HMaster master; org.apache.hadoop.hbase.HTableDescriptor; org.apache.hadoop.hbase.HBaseConfiguration; org.apache.hadoop.hbase.util.VersionInfo; + org.apache.hadoop.hbase.util.Pair; org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @@ -50,7 +53,8 @@ Arrays.sort(serverNames);
  • Memory
  • Requests
  • Storefiles
  • -
  • Compactions
  • +
  • Compactions
  • +
  • Replications
  • @@ -65,9 +69,12 @@ Arrays.sort(serverNames);
    <& storeStats; serverNames = serverNames; &>
    -
    +
    <& compactionStats; serverNames = serverNames; &>
    +
    + <& replicationStats; serverNames = serverNames; &> +
    @@ -117,7 +124,7 @@ Arrays.sort(serverNames); long startcode = serverName.getStartcode(); - <& serverNameLink; serverName=serverName; serverLoad = sl; &> + <& serverNameLink; serverName=serverName; &> <% new Date(startcode) %> <% TraditionalBinaryPrefix.long2String(lastContact, "s", 1) %> <% version %> @@ -164,7 +171,7 @@ for (ServerName serverName: serverNames) { if (sl != null) { - <& serverNameLink; serverName=serverName; serverLoad = sl; &> + <& serverNameLink; serverName=serverName; &> <% TraditionalBinaryPrefix.long2String(sl.getUsedHeapMB() * TraditionalBinaryPrefix.MEGA.value, "B", 1) %> <% TraditionalBinaryPrefix.long2String(sl.getMaxHeapMB() @@ -207,7 +214,7 @@ ServerLoad sl = master.getServerManager().getLoad(serverName); if (sl != null) { -<& serverNameLink; serverName=serverName; serverLoad = sl; &> +<& serverNameLink; serverName=serverName; &> <% String.format("%.0f", sl.getRequestsPerSecond()) %> <% sl.getReadRequestsCount() %> <% sl.getWriteRequestsCount() %> @@ -249,7 +256,7 @@ ServerLoad sl = master.getServerManager().getLoad(serverName); if (sl != null) { -<& serverNameLink; serverName=serverName; serverLoad = sl; &> +<& serverNameLink; serverName=serverName; &> <% sl.getStores() %> <% sl.getStorefiles() %> <% TraditionalBinaryPrefix.long2String( @@ -300,7 +307,7 @@ if (sl.getTotalCompactingKVs() > 0) { } -<& serverNameLink; serverName=serverName; serverLoad = sl; &> +<& serverNameLink; serverName=serverName; &> <% sl.getTotalCompactingKVs() %> <% sl.getCurrentCompactedKVs() %> <% sl.getTotalCompactingKVs() - sl.getCurrentCompactedKVs() %> @@ -318,11 +325,72 @@ if (sl.getTotalCompactingKVs() > 0) { +<%def replicationStats> +<%args> + ServerName [] serverNames; + +<%java> + HashMap>> replicationLoadSourceMap + = master.getReplicationLoad(serverNames); + List peers = null; + if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0){ + peers = new ArrayList<>(replicationLoadSourceMap.keySet()); + Collections.sort(peers); + } + + +<%if (replicationLoadSourceMap != null && replicationLoadSourceMap.size() > 0) %> + +
    + +
    + <%java> + active = "active"; + for (String peer : peers){ + +
    + + + + + + + + + <%for Pair pair: replicationLoadSourceMap.get(peer) %> + + + + + + + +
    ServerAgeOfLastShippedOpSizeOfLogQueueReplicationLag
    <& serverNameLink; serverName=pair.getFirst(); &><% StringUtils.humanTimeDiff(pair.getSecond().getAgeOfLastShippedOp()) %><% pair.getSecond().getSizeOfLogQueue() %><% StringUtils.humanTimeDiff(pair.getSecond().getReplicationLag()) %>
    +
    + <%java> + active = ""; + } + +
    +
    +<%else> +

    No Peers Metrics

    + + <%def serverNameLink> <%args> ServerName serverName; - ServerLoad serverLoad; <%java> int infoPort = master.getRegionServerInfoPort(serverName); @@ -341,7 +409,7 @@ if (sl.getTotalCompactingKVs() > 0) { ServerName serverName; - <& serverNameLink; serverName=serverName; serverLoad = null; &> + <& serverNameLink; serverName=serverName; &> diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon index a8d4003146..bc3bbd7856 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -122,6 +122,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; <& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &> +
    +

    Replication Status

    + <& ReplicationStatusTmpl; regionServer = regionServer; &> +

    Software Attributes

    diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon new file mode 100644 index 0000000000..7dc1c7f663 --- /dev/null +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon @@ -0,0 +1,105 @@ +<%doc> + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +<%args> + HRegionServer regionServer; + +<%import> + java.util.*; + java.util.Map.Entry; + org.apache.hadoop.hbase.procedure2.util.StringUtils; + org.apache.hadoop.hbase.regionserver.HRegionServer; + org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; + + +<%java> + Map walGroupsReplicationStatus = regionServer.getWalGroupsReplicationStatus(); + + +<%if (walGroupsReplicationStatus != null && walGroupsReplicationStatus.size() > 0) %> + +
    + +
    +
    + <& currentLog; metrics = walGroupsReplicationStatus; &> +
    +
    + <& replicationDelay; metrics = walGroupsReplicationStatus; &> +
    +
    +
    +

    If the replication delay is UNKNOWN, that means this walGroup doesn't start replicate yet and it may get disabled. + If the size of log is 0, it means we are replicating current HLog, thus we can't get accurate size since it's not closed yet.

    + +<%else> +

    No Replication Metrics for Peers

    + + +<%def currentLog> +<%args> + Map metrics; + + + + + + + + + + + <%for Map.Entry entry: metrics.entrySet() %> + + + + + + + + + +
    PeerIdWalGroupCurrent LogSizeQueue SizeOffset
    <% entry.getValue().getPeerId() %><% entry.getValue().getWalGroup() %><% entry.getValue().getCurrentPath() %> <% StringUtils.humanSize(entry.getValue().getFileSize()) %><% entry.getValue().getQueueSize() %><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %>
    + + +<%def replicationDelay> +<%args> + Map metrics; + + + + + + + + + + <%for Map.Entry entry: metrics.entrySet() %> + + + + + + + + +
    PeerIdWalGroupCurrent LogLast Shipped AgeReplication Delay
    <% entry.getValue().getPeerId() %><% entry.getValue().getWalGroup() %><% entry.getValue().getCurrentPath() %> <% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %><% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %>
    + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0d0b5c1e85..c1405b72b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -150,6 +150,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy; +import org.apache.hadoop.hbase.replication.ReplicationLoadSource; import org.apache.hadoop.hbase.replication.master.TableCFsUpdater; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.User; @@ -3292,4 +3293,24 @@ public class HMaster extends HRegionServer implements MasterServices, Server { public LoadBalancer getLoadBalancer() { return balancer; } + + public HashMap>> + getReplicationLoad(ServerName[] serverNames) { + HashMap>> replicationLoadSourceMap = + new HashMap<>(); + for (ServerName serverName : serverNames) { + List replicationLoadSources = + getServerManager().getLoad(serverName).getReplicationLoadSourceList(); + for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) { + List> list = + replicationLoadSourceMap.get(replicationLoadSource.getPeerID()); + if (list == null) { + list = new ArrayList>(); + replicationLoadSourceMap.put(replicationLoadSource.getPeerID(), list); + } + list.add(new Pair<>(serverName, replicationLoadSource)); + } + } + return replicationLoadSourceMap; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f7737de142..258f68ecba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -164,6 +164,8 @@ import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -2832,6 +2834,20 @@ public class HRegionServer extends HasThread implements return service; } + public Map getWalGroupsReplicationStatus(){ + Map walGroupsReplicationStatus = new TreeMap<>(); + if(!this.isOnline()){ + return walGroupsReplicationStatus; + } + List allSources = new ArrayList<>(); + allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); + allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); + for(ReplicationSourceInterface source: allSources){ + walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); + } + return walGroupsReplicationStatus; + } + /** * Utility for constructing an instance of the passed HRegionServer class. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index 25a27a9053..3a0ed85886 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -57,5 +57,5 @@ public interface ReplicationService { /** * Refresh and Get ReplicationLoad */ - public ReplicationLoad refreshAndGetReplicationLoad(); + ReplicationLoad refreshAndGetReplicationLoad(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 13b502b9d5..93f98f86f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; /** * A source for a replication stream has to expose this service. @@ -33,4 +34,9 @@ public interface ReplicationSourceService extends ReplicationService { * observe log rolls and log archival events. */ WALActionsListener getWALActionsListener(); + + /** + * Returns the replication manager + */ + ReplicationSourceManager getReplicationManager(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 53e10741ee..550363280d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.HashMap; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -37,10 +35,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public class MetricsSource implements BaseSource { - private static final Log LOG = LogFactory.getLog(MetricsSource.class); - // tracks last shipped timestamp for each wal group - private Map lastTimeStamps = new HashMap(); + private Map lastTimestamps = new HashMap<>(); + private Map ageOfLastShippedOp = new HashMap<>(); private long lastHFileRefsQueueSize = 0; private String id; @@ -87,7 +84,8 @@ public class MetricsSource implements BaseSource { long age = EnvironmentEdgeManager.currentTime() - timestamp; singleSourceSource.setLastShippedAge(age); globalSourceSource.setLastShippedAge(age); - this.lastTimeStamps.put(walGroup, timestamp); + this.ageOfLastShippedOp.put(walGroup, age); + this.lastTimestamps.put(walGroup, timestamp); } /** @@ -104,15 +102,33 @@ public class MetricsSource implements BaseSource { } this.singleSourceSourceByTable.get(tableName).setLastShippedAge(age); } + /** + * get the last timestamp of given wal group. If the walGroup is null, return 0. + * @param walGroup which group we are getting + * @return timeStamp + */ + public long getLastTimeStampOfWalGroup(String walGroup) { + return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup); + } + + /** + * get age of last shipped op of given wal group. If the walGroup is null, return 0 + * @param walGroup which group we are getting + * @return age + */ + public long getAgeofLastShippedOp(String walGroup) { + return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup); + } + /** * Convenience method to use the last given timestamp to refresh the age of the last edit. Used * when replication fails and need to keep that metric accurate. * @param walGroupId id of the group to update */ public void refreshAgeOfLastShippedOp(String walGroupId) { - Long lastTimestamp = this.lastTimeStamps.get(walGroupId); + Long lastTimestamp = this.lastTimestamps.get(walGroupId); if (lastTimestamp == null) { - this.lastTimeStamps.put(walGroupId, 0L); + this.lastTimestamps.put(walGroupId, 0L); lastTimestamp = 0L; } if (lastTimestamp > 0) { @@ -204,7 +220,8 @@ public class MetricsSource implements BaseSource { singleSourceSource.decrSizeOfLogQueue(lastQueueSize); singleSourceSource.clear(); globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); - lastTimeStamps.clear(); + lastTimestamps.clear(); + ageOfLastShippedOp.clear(); lastHFileRefsQueueSize = 0; } @@ -230,7 +247,7 @@ public class MetricsSource implements BaseSource { */ public long getTimeStampOfLastShippedOp() { long lastTimestamp = 0L; - for (long ts : lastTimeStamps.values()) { + for (long ts : lastTimestamps.values()) { if (ts > lastTimestamp) { lastTimestamp = ts; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 2ead3df99e..51fbc934eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -79,19 +79,8 @@ public class ReplicationLoad { long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); int sizeOfLogQueue = sm.getSizeOfLogQueue(); long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp(); - long replicationLag; - long timePassedAfterLastShippedOp = - EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; - if (sizeOfLogQueue != 0) { - // err on the large side - replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); - } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { - replicationLag = ageOfLastShippedOp; // last shipped happen recently - } else { - // last shipped may happen last night, - // so NO real lag although ageOfLastShippedOp is non-zero - replicationLag = 0; - } + long replicationLag = + calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue); ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); if (rLoadSource != null) { @@ -116,6 +105,24 @@ public class ReplicationLoad { replicationLoadSourceMap.values()); } + static long calculateReplicationDelay(long ageOfLastShippedOp, + long timeStampOfLastShippedOp, int sizeOfLogQueue) { + long replicationLag; + long timePassedAfterLastShippedOp = + EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; + if (sizeOfLogQueue > 1) { + // err on the large side + replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); + } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { + replicationLag = ageOfLastShippedOp; // last shipped happen recently + } else { + // last shipped may happen last night, + // so NO real lag although ageOfLastShippedOp is non-zero + replicationLag = 0; + } + return replicationLag; + } + /** * sourceToString * @return a string contains sourceReplicationLoad information diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 23966550eb..cae7c211a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -29,6 +29,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; @@ -501,6 +502,38 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; } + @Override + public Map getWalGroupStatus() { + Map sourceReplicationStatus = new TreeMap<>(); + long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize; + for (ReplicationSourceShipperThread worker : workerThreads.values()) { + String walGroupId = worker.getWalGroupId(); + lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId); + ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId); + int queueSize = queues.get(walGroupId).size(); + replicationDelay = + ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); + Path currentPath = worker.getCurrentPath(); + try { + fileSize = fs.getContentSummary(currentPath).getLength(); + } catch (IOException e) { + fileSize = -1; + } + ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); + statusBuilder.withPeerId(this.getPeerClusterId()) + .withQueueSize(queueSize) + .withWalGroup(walGroupId) + .withCurrentPath(currentPath) + .withCurrentPosition(worker.getCurrentPosition()) + .withFileSize(fileSize) + .withAgeOfLastShippedOp(ageOfLastShippedOp) + .withReplicationDelay(replicationDelay); + sourceReplicationStatus.put(this.getPeerClusterId() + "=>" + walGroupId, + statusBuilder.build()); + } + return sourceReplicationStatus; + } + // This thread reads entries from a queue and ships them. // Entries are placed onto the queue by ReplicationSourceWALReaderThread public class ReplicationSourceShipperThread extends Thread { @@ -525,6 +558,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf this.source = source; } + public String getWalGroupId() { + return walGroupId; + } + @Override public void run() { setWorkerState(WorkerState.RUNNING); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index e7569edf80..8bfbca013a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -127,4 +128,9 @@ public interface ReplicationSourceInterface { */ MetricsSource getSourceMetrics(); + /** + * get the stat of replication for each wal group. + * @return stat of replication + */ + Map getWalGroupStatus(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index 03fe229992..ec5e862985 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -251,7 +252,11 @@ public class ReplicationSourceWALReaderThread extends Thread { return entryBatchQueue.take(); } - public long getEntrySizeIncludeBulkLoad(Entry entry) { + public WALEntryBatch poll(long timeout) throws InterruptedException { + return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS); + } + + private long getEntrySizeIncludeBulkLoad(Entry entry) { WALEdit edit = entry.getEdit(); WALKey key = entry.getKey(); return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) + @@ -470,9 +475,5 @@ public class ReplicationSourceWALReaderThread extends Thread { private void incrementHeapSize(long increment) { heapSize += increment; } - - private void setLastPosition(String region, Long sequenceId) { - getLastSeqIds().put(region, sequenceId); - } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java new file mode 100644 index 0000000000..41076cc771 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public final class ReplicationStatus { + private final String peerId; + private final String walGroup; + private final Path currentPath; + private final int queueSize; + private final long ageOfLastShippedOp; + private final long replicationDelay; + private final long currentPosition; + private final long fileSize; + + private ReplicationStatus(ReplicationStatusBuilder builder) { + this.peerId = builder.peerId; + this.walGroup = builder.walGroup; + this.currentPath = builder.currentPath; + this.queueSize = builder.queueSize; + this.ageOfLastShippedOp = builder.ageOfLastShippedOp; + this.replicationDelay = builder.replicationDelay; + this.currentPosition = builder.currentPosition; + this.fileSize = builder.fileSize; + } + + public long getCurrentPosition() { + return currentPosition; + } + + public long getFileSize() { + return fileSize; + } + + public String getPeerId() { + return peerId; + } + + public String getWalGroup() { + return walGroup; + } + + public int getQueueSize() { + return queueSize; + } + + public long getAgeOfLastShippedOp() { + return ageOfLastShippedOp; + } + + public long getReplicationDelay() { + return replicationDelay; + } + + public Path getCurrentPath() { + return currentPath; + } + + public static ReplicationStatusBuilder newBuilder() { + return new ReplicationStatusBuilder(); + } + + public static class ReplicationStatusBuilder { + private String peerId = "UNKNOWN"; + private String walGroup = "UNKNOWN"; + private Path currentPath = new Path("UNKNOWN"); + private int queueSize = -1; + private long ageOfLastShippedOp = -1; + private long replicationDelay = -1; + private long currentPosition = -1; + private long fileSize = -1; + + public ReplicationStatusBuilder withPeerId(String peerId) { + this.peerId = peerId; + return this; + } + + public ReplicationStatusBuilder withFileSize(long fileSize) { + this.fileSize = fileSize; + return this; + } + + public ReplicationStatusBuilder withWalGroup(String walGroup) { + this.walGroup = walGroup; + return this; + } + + public ReplicationStatusBuilder withCurrentPath(Path currentPath) { + this.currentPath = currentPath; + return this; + } + + public ReplicationStatusBuilder withQueueSize(int queueSize) { + this.queueSize = queueSize; + return this; + } + + public ReplicationStatusBuilder withAgeOfLastShippedOp(long ageOfLastShippedOp) { + this.ageOfLastShippedOp = ageOfLastShippedOp; + return this; + } + + public ReplicationStatusBuilder withReplicationDelay(long replicationDelay) { + this.replicationDelay = replicationDelay; + return this; + } + + public ReplicationStatusBuilder withCurrentPosition(long currentPosition) { + this.currentPosition = currentPosition; + return this; + } + + public ReplicationStatus build() { + return new ReplicationStatus(this); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java new file mode 100644 index 0000000000..bad92b80a9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetReplicationLoad.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos; +import org.apache.hadoop.hbase.replication.ReplicationLoadSource; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestGetReplicationLoad { + private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class); + + private static HBaseTestingUtility TEST_UTIL; + private static MiniHBaseCluster cluster; + private static HMaster master; + private static ReplicationAdmin admin; + + private static final String ID_1 = "1"; + private static final String ID_2 = "2"; + private static final String KEY_1 = "127.0.0.1:2181:/hbase"; + private static final String KEY_2 = "127.0.0.1:2181:/hbase2"; + + public static class MyMaster extends HMaster { + public MyMaster(Configuration conf, CoordinatedStateManager csm) + throws IOException, KeeperException, InterruptedException { + super(conf, csm); + } + + @Override + protected void tryRegionServerReport(long reportStartTime, long reportEndTime) { + // do nothing + } + } + + @BeforeClass + public static void startCluster() throws Exception { + LOG.info("Starting cluster"); + TEST_UTIL = new HBaseTestingUtility(); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); + TEST_UTIL.startMiniCluster(1, 1, 1, null, TestMasterMetrics.MyMaster.class, null); + cluster = TEST_UTIL.getHBaseCluster(); + LOG.info("Waiting for active/ready master"); + cluster.waitForActiveAndReadyMaster(); + master = cluster.getMaster(); + admin = new ReplicationAdmin(conf); + } + + @AfterClass + public static void after() throws Exception { + if (admin != null) { + admin.close(); + } + if (TEST_UTIL != null) { + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testGetReplicationMetrics() throws Exception { + String peer1 = "test1", peer2 = "test2"; + long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4; + int sizeOfLogQueue = 5; + RegionServerStatusProtos.RegionServerReportRequest.Builder request = + RegionServerStatusProtos.RegionServerReportRequest.newBuilder(); + ServerName serverName = cluster.getMaster(0).getServerName(); + request.setServer(ProtobufUtil.toServerName(serverName)); + ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource + .newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp) + .setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp) + .setSizeOfLogQueue(sizeOfLogQueue).build(); + ClusterStatusProtos.ReplicationLoadSource rload2 = + ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2) + .setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1) + .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1) + .setSizeOfLogQueue(sizeOfLogQueue + 1).build(); + ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() + .addReplLoadSource(rload1).addReplLoadSource(rload2).build(); + request.setLoad(sl); + + ReplicationPeerConfig peerConfig_1 = new ReplicationPeerConfig(); + peerConfig_1.setClusterKey(KEY_1); + ReplicationPeerConfig peerConfig_2 = new ReplicationPeerConfig(); + peerConfig_2.setClusterKey(KEY_2); + admin.addPeer(ID_1, peerConfig_1); + admin.addPeer(ID_2, peerConfig_2); + + master.getMasterRpcServices().regionServerReport(null, request.build()); + HashMap>> replicationLoad = + master.getReplicationLoad(new ServerName[] { serverName }); + assertEquals("peer size ", 2, replicationLoad.size()); + assertEquals("load size ", 1, replicationLoad.get(peer1).size()); + assertEquals("log queue size of peer1", sizeOfLogQueue, + replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue()); + assertEquals("replication lag of peer2", replicationLag + 1, + replicationLoad.get(peer2).get(0).getSecond().getReplicationLag()); + + master.stopMaster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index ad8c52f4f2..a5c61f20f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.hadoop.conf.Configuration; @@ -30,6 +32,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.util.Pair; /** @@ -106,4 +109,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { public MetricsSource getSourceMetrics() { return metrics; } + + @Override + public Map getWalGroupStatus() { + return new HashMap<>(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index d0f40a6bd5..ddd319586b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -77,6 +77,7 @@ public class TestReplicationBase { protected static final byte[] famName = Bytes.toBytes("f"); protected static final byte[] row = Bytes.toBytes("row"); protected static final byte[] noRepfamName = Bytes.toBytes("norep"); + protected static final String PEER_ID2 = "2"; /** * @throws java.lang.Exception diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java new file mode 100644 index 0000000000..d507b76a73 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationMetricsforUI extends TestReplicationBase { + + private static final byte[] qualName = Bytes.toBytes("q"); + + @Test + public void testReplicationMetrics() throws Exception { + try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { + Put p = new Put(Bytes.toBytes("starter")); + p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay")); + htable1.put(p); + // make sure replication done + while (htable2.get(new Get(Bytes.toBytes("starter"))).size() == 0) { + Thread.sleep(500); + } + // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp + Thread.sleep(5000); + HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName); + Map metrics = rs.getWalGroupsReplicationStatus(); + Assert.assertEquals("metric size ", 1, metrics.size()); + for (Map.Entry metric : metrics.entrySet()) { + Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId()); + Assert.assertEquals("queue length", 1, metric.getValue().getQueueSize()); + Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay()); + long pos = metric.getValue().getCurrentPosition(); + // Semantics are a bit different in branch-1: If not started, pos will be -1 + if (pos == -1) { + pos = 0; + } + Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0); + } + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p = new Put(Bytes.toBytes("" + Integer.toString(i))); + p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay " + i)); + htable1.put(p); + } + while (htable2.get(new Get(Bytes.toBytes("" + Integer.toString(NB_ROWS_IN_BATCH - 1)))) + .size() == 0) { + Thread.sleep(500); + } + rs = utility1.getRSForFirstRegionInTable(tableName); + metrics = rs.getWalGroupsReplicationStatus(); + Path lastPath = null; + for (Map.Entry metric : metrics.entrySet()) { + lastPath = metric.getValue().getCurrentPath(); + Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId()); + Assert.assertTrue("age of Last Shipped Op should be > 0 ", + metric.getValue().getAgeOfLastShippedOp() > 0); + long pos = metric.getValue().getCurrentPosition(); + Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0); + } + + hbaseAdmin.rollWALWriter(rs.getServerName()); + p = new Put(Bytes.toBytes("trigger")); + p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay")); + htable1.put(p); + // make sure replication rolled to a new log + while (htable2.get(new Get(Bytes.toBytes("trigger"))).size() == 0) { + Thread.sleep(500); + } + // sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp + Thread.sleep(5000); + metrics = rs.getWalGroupsReplicationStatus(); + for (Map.Entry metric : metrics.entrySet()) { + Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay()); + long pos = metric.getValue().getCurrentPosition(); + Assert.assertTrue("current position should be >= 0, is " + pos, pos >= 0); + Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath()); + } + } + } +} \ No newline at end of file -- 2.20.1