From b156e0eccc0ce0bbe64bbdf5c1e8c29ac7065ddf Mon Sep 17 00:00:00 2001 From: jingyuntian Date: Tue, 27 Mar 2018 11:32:34 +0800 Subject: [PATCH] add replicationUI for regionserver --- .../hbase/tmpl/regionserver/RSStatusTmpl.jamon | 4 + .../tmpl/regionserver/ReplicationStatusTmpl.jamon | 105 ++++++++++++++++++++ .../hadoop/hbase/regionserver/HRegionServer.java | 16 ++++ .../hbase/regionserver/ReplicationService.java | 2 +- .../regionserver/ReplicationSourceService.java | 6 ++ .../replication/regionserver/MetricsSource.java | 21 ++++ .../replication/regionserver/ReplicationLoad.java | 32 ++++--- .../regionserver/ReplicationSource.java | 31 ++++++ .../regionserver/ReplicationSourceInterface.java | 9 ++ .../regionserver/ReplicationSourceWALReader.java | 16 ++++ .../regionserver/ReplicationStatus.java | 106 +++++++++++++++++++++ 11 files changed, 334 insertions(+), 14 deletions(-) create mode 100644 hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon index c9bfcc9..646d835 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -121,6 +121,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; <& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &> +
+

Replication Status

+ <& ReplicationStatusTmpl; regionServer = regionServer; &> +

Software Attributes

diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon new file mode 100644 index 0000000..1114fc6 --- /dev/null +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon @@ -0,0 +1,105 @@ +<%doc> + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +<%args> + HRegionServer regionServer; + +<%import> + java.util.*; + java.util.Map.Entry; + org.apache.hadoop.hbase.procedure2.util.StringUtils; + org.apache.hadoop.hbase.regionserver.HRegionServer; + org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; + + +<%java> + Map walGroupsReplicationStatus = regionServer.getWalGroupsReplicationStatus(); + + +<%if (walGroupsReplicationStatus != null && walGroupsReplicationStatus.size() > 0) %> + +
+ +
+
+ <& currentLog; metrics = walGroupsReplicationStatus; &> +
+
+ <& replicationDelay; metrics = walGroupsReplicationStatus; &> +
+
+
+

If the size of log is 0, it means we are replicating current HLog, thus we can't get accurate size since it's not closed yet. + If the size of log is -1, it meas the log is moved to archive, its path will update later

+ +<%else> +

No Replication Metrics for Peers

+ + +<%def currentLog> +<%args> + Map metrics; + + + + + + + + + + + <%for Map.Entry entry: metrics.entrySet() %> + + + + + + + + + +
PeerIdWalGroupCurrent LogSizeQueue SizeOffset
<% entry.getValue().getPeerId() %><% entry.getValue().getWalGroup() %><% entry.getValue().getCurrentPath() %> <% StringUtils.humanSize(entry.getValue().getFileSize()) %><% entry.getValue().getQueueSize() %><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %>
+ + +<%def replicationDelay> +<%args> + Map metrics; + + + + + + + + + + <%for Map.Entry entry: metrics.entrySet() %> + + + + + + + + +
PeerIdWalGroupCurrent LogLast Shipped AgeReplication Delay
<% entry.getValue().getPeerId() %><% entry.getValue().getWalGroup() %><% entry.getValue().getCurrentPath() %> <% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %><% StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %>
+ \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d29bce7..ca0b607 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -135,6 +135,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -2982,6 +2984,20 @@ public class HRegionServer extends HasThread implements return service; } + public Map getWalGroupsReplicationStatus(){ + if(!this.isOnline()){ + return null; + } + List allSources = new ArrayList<>(); + allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); + allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); + Map walGroupsReplicationStatus = new TreeMap<>(); + for(ReplicationSourceInterface source: allSources){ + walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); + } + return walGroupsReplicationStatus; + } + /** * Utility for constructing an instance of the passed HRegionServer class. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index c34231d..e9bbaea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -54,5 +54,5 @@ public interface ReplicationService { /** * Refresh and Get ReplicationLoad */ - public ReplicationLoad refreshAndGetReplicationLoad(); + ReplicationLoad refreshAndGetReplicationLoad(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 23ba773..796137a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.yetus.audience.InterfaceAudience; /** @@ -32,4 +33,9 @@ public interface ReplicationSourceService extends ReplicationService { * Returns a Handler to handle peer procedures. */ PeerProcedureHandler getPeerProcedureHandler(); + + /** + * Returns the replication manager + */ + ReplicationSourceManager getReplicationManager(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index a59dd72..6d4911a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -40,6 +40,7 @@ public class MetricsSource implements BaseSource { // tracks last shipped timestamp for each wal group private Map lastTimestamps = new HashMap<>(); + private Map ageOfLastShippedOp = new HashMap<>(); private long lastHFileRefsQueueSize = 0; private String id; @@ -82,10 +83,30 @@ public class MetricsSource implements BaseSource { long age = EnvironmentEdgeManager.currentTime() - timestamp; singleSourceSource.setLastShippedAge(age); globalSourceSource.setLastShippedAge(age); + this.ageOfLastShippedOp.put(walGroup, age); this.lastTimestamps.put(walGroup, timestamp); } /** + * get the last timestamp of given wal group. If the walGroup is null, return current time. + * @param walGroup + * @return timeStamp + */ + public long getLastTimeStampOfWalGroup(String walGroup){ + return this.lastTimestamps.get(walGroup) == null ? EnvironmentEdgeManager.currentTime() + : lastTimestamps.get(walGroup); + } + + /** + * get age of last shipped op of given wal group. If the walGroup is null, return 0 + * @param walGroup + * @return age + */ + public long getAgeofLastShippedOp(String walGroup){ + return this.ageOfLastShippedOp.get(walGroup) == null? 0: ageOfLastShippedOp.get(walGroup); + } + + /** * Convenience method to use the last given timestamp to refresh the age of the last edit. Used * when replication fails and need to keep that metric accurate. * @param walGroupId id of the group to update diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 219c03d..c1f2702 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -79,19 +79,7 @@ public class ReplicationLoad { long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); int sizeOfLogQueue = sm.getSizeOfLogQueue(); long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp(); - long replicationLag; - long timePassedAfterLastShippedOp = - EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; - if (sizeOfLogQueue != 0) { - // err on the large side - replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); - } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { - replicationLag = ageOfLastShippedOp; // last shipped happen recently - } else { - // last shipped may happen last night, - // so NO real lag although ageOfLastShippedOp is non-zero - replicationLag = 0; - } + long replicationLag = calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue); ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); if (rLoadSource != null) { @@ -114,6 +102,24 @@ public class ReplicationLoad { this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values()); } + public static long calculateReplicationDelay(long ageOfLastShippedOp, + long timeStampOfLastShippedOp, int sizeOfLogQueue) { + long replicationLag; + long timePassedAfterLastShippedOp = + EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; + if (sizeOfLogQueue != 0) { + // err on the large side + replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); + } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { + replicationLag = ageOfLastShippedOp; // last shipped happen recently + } else { + // last shipped may happen last night, + // so NO real lag although ageOfLastShippedOp is non-zero + replicationLag = 0; + } + return replicationLag; + } + /** * sourceToString * @return a string contains sourceReplicationLoad information diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 236c575..ce12f7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -25,6 +25,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; @@ -310,6 +311,36 @@ public class ReplicationSource implements ReplicationSourceInterface { } } + @Override + public Map getWalGroupStatus() { + Map sourceReplicationStatus = new TreeMap<>(); + ReplicationStatus status; + String walGroupId; + ReplicationSourceShipper shipper; + long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize; + for (Map.Entry walGroupShipper : workerThreads.entrySet()) { + walGroupId = walGroupShipper.getKey(); + shipper = walGroupShipper.getValue(); + lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId); + ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId); + int queueSize = queues.get(walGroupId).size(); + replicationDelay = + ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); + try { + fileSize = fs.getContentSummary(shipper.entryReader.getCurrentLogPath()).getLength(); + } catch (IOException e) { + fileSize = -1; + } + status = new ReplicationStatus(); + status.setPeerId(this.peerId).setQueueSize(queueSize).setWalGroup(walGroupId) + .setCurrentPath(shipper.getCurrentPath()).setCurrentPosition(shipper.getCurrentPosition()) + .setFileSize(fileSize).setAgeOfLastShippedOp(ageOfLastShippedOp) + .setReplicationDelay(replicationDelay); + sourceReplicationStatus.put(peerId + "=>" + walGroupId, status); + } + return sourceReplicationStatus; + } + protected ReplicationSourceShipper createNewShipper(String walGroupId, PriorityBlockingQueue queue) { return new ReplicationSourceShipper(conf, walGroupId, queue, this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 090b465..78a4a07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.hadoop.conf.Configuration; @@ -168,6 +169,14 @@ public interface ReplicationSourceInterface { ServerName getServerWALsBelongTo(); /** + * get the stat of replication for each wal group. + * @return stat of replication + */ + default Map getWalGroupStatus() { + return null; + } + + /** * @return whether this is a replication source for recovery. */ default boolean isRecovered() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 7ba347f..f2e3865 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -76,6 +76,8 @@ class ReplicationSourceWALReader extends Thread { private AtomicLong totalBufferUsed; private long totalBufferQuota; + private Path currentLogPath; + /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -129,6 +131,7 @@ class ReplicationSourceWALReader extends Thread { if (!checkQuota()) { continue; } + updateCurrentLogPath(entryStream.getCurrentPath()); WALEntryBatch batch = readWALEntries(entryStream); currentPosition = entryStream.getPosition(); if (batch != null) { @@ -158,6 +161,19 @@ class ReplicationSourceWALReader extends Thread { } } + private void updateCurrentLogPath(Path currentLogPath){ + if(this.currentLogPath != currentLogPath){ + this.currentLogPath = currentLogPath; + } + } + + /** + * return current log path that the reader opened. + */ + public Path getCurrentLogPath(){ + return this.currentLogPath; + } + // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return. protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { WALEdit edit = entry.getEdit(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java new file mode 100644 index 0000000..0df5997 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class ReplicationStatus { + private String peerId; + private String walGroup; + private Path currentPath; + private int queueSize; + private long ageOfLastShippedOp; + private long replicationDelay; + private long currentPosition; + private long fileSize; + + public long getCurrentPosition() { + return currentPosition; + } + + public ReplicationStatus setCurrentPosition(long currentPosition) { + this.currentPosition = currentPosition; + return this; + } + + public long getFileSize() { + return fileSize; + } + + public ReplicationStatus setFileSize(long fileSize) { + this.fileSize = fileSize; + return this; + } + + public String getPeerId() { + return peerId; + } + + public ReplicationStatus setPeerId(String peerId) { + this.peerId = peerId; + return this; + } + + public String getWalGroup() { + return walGroup; + } + + public ReplicationStatus setWalGroup(String walGroup) { + this.walGroup = walGroup; + return this; + } + + public int getQueueSize() { + return queueSize; + } + + public ReplicationStatus setQueueSize(int queueSize) { + this.queueSize = queueSize; + return this; + } + + public long getAgeOfLastShippedOp() { + return ageOfLastShippedOp; + } + + public ReplicationStatus setAgeOfLastShippedOp(long ageOfLastShippedOp) { + this.ageOfLastShippedOp = ageOfLastShippedOp; + return this; + } + + public long getReplicationDelay() { + return replicationDelay; + } + + public ReplicationStatus setReplicationDelay(long replicationDelay) { + this.replicationDelay = replicationDelay; + return this; + } + + public Path getCurrentPath() { + return currentPath; + } + + public ReplicationStatus setCurrentPath(Path currentPath) { + this.currentPath = currentPath; + return this; + } +} -- 2.7.4