From f6eff6222c741a7214a34923d4f91692ec279218 Mon Sep 17 00:00:00 2001 From: jingyuntian Date: Tue, 26 Jun 2018 18:19:47 +0800 Subject: [PATCH] HBASE-20193 Basic Replication Web UI - Regionserver --- .../hbase/tmpl/regionserver/RSStatusTmpl.jamon | 4 + .../tmpl/regionserver/ReplicationStatusTmpl.jamon | 104 +++++++++++++++++++++ .../hadoop/hbase/regionserver/HRegionServer.java | 16 ++++ .../hbase/regionserver/ReplicationService.java | 2 +- .../regionserver/ReplicationSourceService.java | 6 ++ .../replication/regionserver/MetricsSource.java | 21 +++++ .../replication/regionserver/ReplicationLoad.java | 33 ++++--- .../regionserver/ReplicationSource.java | 45 +++++++++ .../regionserver/ReplicationSourceInterface.java | 9 ++ .../regionserver/ReplicationSourceManager.java | 3 +- .../regionserver/ReplicationSourceShipper.java | 13 ++- .../regionserver/ReplicationSourceWALReader.java | 21 +++++ .../regionserver/ReplicationStatus.java | 98 +++++++++++++++++++ .../replication/TestReplicationMetricsforUI.java | 97 +++++++++++++++++++ 14 files changed, 456 insertions(+), 16 deletions(-) create mode 100644 hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon index c9bfcc9..646d835 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/RSStatusTmpl.jamon @@ -121,6 +121,10 @@ org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; <& RegionListTmpl; regionServer = regionServer; onlineRegions = onlineRegions; &> +
+

Replication Status

+ <& ReplicationStatusTmpl; regionServer = regionServer; &> +

Software Attributes

diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon new file mode 100644 index 0000000..8069866 --- /dev/null +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon @@ -0,0 +1,104 @@ +<%doc> + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +<%args> + HRegionServer regionServer; + +<%import> + java.util.*; + java.util.Map.Entry; + org.apache.hadoop.hbase.procedure2.util.StringUtils; + org.apache.hadoop.hbase.regionserver.HRegionServer; + org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; + + +<%java> + Map walGroupsReplicationStatus = regionServer.getWalGroupsReplicationStatus(); + + +<%if (walGroupsReplicationStatus != null && walGroupsReplicationStatus.size() > 0) %> + +
+ +
+
+ <& currentLog; metrics = walGroupsReplicationStatus; &> +
+
+ <& replicationDelay; metrics = walGroupsReplicationStatus; &> +
+
+
+

If the size of log is 0, it means we are replicating current HLog, thus we can't get accurate size since it's not closed yet.

+ +<%else> +

No Replication Metrics for Peers

+ + +<%def currentLog> +<%args> + Map metrics; + + + + + + + + + + + <%for Map.Entry entry: metrics.entrySet() %> + + + + + + + + + +
PeerIdWalGroupCurrent LogSizeQueue SizeOffset
<% entry.getValue().getPeerId() %><% entry.getValue().getWalGroup() %><% entry.getValue().getCurrentPath() %> <% StringUtils.humanSize(entry.getValue().getFileSize()) %><% entry.getValue().getQueueSize() %><% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %>
+ + +<%def replicationDelay> +<%args> + Map metrics; + + + + + + + + + + <%for Map.Entry entry: metrics.entrySet() %> + + + + + + + + +
PeerIdWalGroupCurrent LogLast Shipped AgeReplication Delay
<% entry.getValue().getPeerId() %><% entry.getValue().getWalGroup() %><% entry.getValue().getCurrentPath() %> <% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %><% StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %>
+ \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 054bebd..a940068 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -134,6 +134,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; @@ -2985,6 +2987,20 @@ public class HRegionServer extends HasThread implements return service; } + public Map getWalGroupsReplicationStatus(){ + if(!this.isOnline()){ + return null; + } + List allSources = new ArrayList<>(); + allSources.addAll(replicationSourceHandler.getReplicationManager().getSources()); + allSources.addAll(replicationSourceHandler.getReplicationManager().getOldSources()); + Map walGroupsReplicationStatus = new TreeMap<>(); + for(ReplicationSourceInterface source: allSources){ + walGroupsReplicationStatus.putAll(source.getWalGroupStatus()); + } + return walGroupsReplicationStatus; + } + /** * Utility for constructing an instance of the passed HRegionServer class. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index c34231d..e9bbaea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -54,5 +54,5 @@ public interface ReplicationService { /** * Refresh and Get ReplicationLoad */ - public ReplicationLoad refreshAndGetReplicationLoad(); + ReplicationLoad refreshAndGetReplicationLoad(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 23ba773..796137a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.yetus.audience.InterfaceAudience; /** @@ -32,4 +33,9 @@ public interface ReplicationSourceService extends ReplicationService { * Returns a Handler to handle peer procedures. */ PeerProcedureHandler getPeerProcedureHandler(); + + /** + * Returns the replication manager + */ + ReplicationSourceManager getReplicationManager(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 7bc7084..c9758ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -42,6 +42,7 @@ public class MetricsSource implements BaseSource { // tracks last shipped timestamp for each wal group private Map lastTimestamps = new HashMap<>(); + private Map ageOfLastShippedOp = new HashMap<>(); private long lastHFileRefsQueueSize = 0; private String id; @@ -87,6 +88,7 @@ public class MetricsSource implements BaseSource { long age = EnvironmentEdgeManager.currentTime() - timestamp; singleSourceSource.setLastShippedAge(age); globalSourceSource.setLastShippedAge(age); + this.ageOfLastShippedOp.put(walGroup, age); this.lastTimestamps.put(walGroup, timestamp); } @@ -104,6 +106,25 @@ public class MetricsSource implements BaseSource { } /** + * get the last timestamp of given wal group. If the walGroup is null, return current time. + * @param walGroup which group we are getting + * @return timeStamp + */ + public long getLastTimeStampOfWalGroup(String walGroup) { + return this.lastTimestamps.get(walGroup) == null ? EnvironmentEdgeManager.currentTime() + : lastTimestamps.get(walGroup); + } + + /** + * get age of last shipped op of given wal group. If the walGroup is null, return 0 + * @param walGroup which group we are getting + * @return age + */ + public long getAgeofLastShippedOp(String walGroup) { + return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup); + } + + /** * Convenience method to use the last given timestamp to refresh the age of the last edit. Used * when replication fails and need to keep that metric accurate. * @param walGroupId id of the group to update diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 219c03d..9e3b16b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -79,19 +79,8 @@ public class ReplicationLoad { long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); int sizeOfLogQueue = sm.getSizeOfLogQueue(); long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp(); - long replicationLag; - long timePassedAfterLastShippedOp = - EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; - if (sizeOfLogQueue != 0) { - // err on the large side - replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); - } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { - replicationLag = ageOfLastShippedOp; // last shipped happen recently - } else { - // last shipped may happen last night, - // so NO real lag although ageOfLastShippedOp is non-zero - replicationLag = 0; - } + long replicationLag = + calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue); ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); if (rLoadSource != null) { @@ -114,6 +103,24 @@ public class ReplicationLoad { this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values()); } + public static long calculateReplicationDelay(long ageOfLastShippedOp, + long timeStampOfLastShippedOp, int sizeOfLogQueue) { + long replicationLag; + long timePassedAfterLastShippedOp = + EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; + if (sizeOfLogQueue > 1) { + // err on the large side + replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); + } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { + replicationLag = ageOfLastShippedOp; // last shipped happen recently + } else { + // last shipped may happen last night, + // so NO real lag although ageOfLastShippedOp is non-zero + replicationLag = 0; + } + return replicationLag; + } + /** * sourceToString * @return a string contains sourceReplicationLoad information diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index b63712b..b396370 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; + +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; @@ -25,6 +28,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; @@ -54,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -308,6 +313,46 @@ public class ReplicationSource implements ReplicationSourceInterface { } } + @Override + public Map getWalGroupStatus() { + Map sourceReplicationStatus = new TreeMap<>(); + ReplicationStatus status; + String walGroupId; + ReplicationSourceShipper shipper; + long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize; + for (Map.Entry walGroupShipper : workerThreads.entrySet()) { + walGroupId = walGroupShipper.getKey(); + shipper = walGroupShipper.getValue(); + lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId); + ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId); + int queueSize = queues.get(walGroupId).size(); + replicationDelay = + ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); + Path currentPath = shipper.getCurrentPath(); + try { + fileSize = fs.getContentSummary(currentPath).getLength(); + } catch (FileNotFoundException e) { + try { + currentPath = getArchivedLogPath(currentPath, conf); + fileSize = fs.getContentSummary(currentPath).getLength(); + } catch (IOException ioe) { + LOG.warn("should not get any exception here...", ioe); + fileSize = -1; + } + } catch (IOException e) { + LOG.warn("should not get any exception here...", e); + fileSize = -1; + } + status = new ReplicationStatus(); + status.setPeerId(this.peerId).setQueueSize(queueSize).setWalGroup(walGroupId) + .setCurrentPath(currentPath).setCurrentPosition(shipper.getCurrentPosition()) + .setFileSize(fileSize).setAgeOfLastShippedOp(ageOfLastShippedOp) + .setReplicationDelay(replicationDelay); + sourceReplicationStatus.put(peerId + "=>" + walGroupId, status); + } + return sourceReplicationStatus; + } + protected ReplicationSourceShipper createNewShipper(String walGroupId, PriorityBlockingQueue queue) { return new ReplicationSourceShipper(conf, walGroupId, queue, this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 090b465..78a4a07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.hadoop.conf.Configuration; @@ -168,6 +169,14 @@ public interface ReplicationSourceInterface { ServerName getServerWALsBelongTo(); /** + * get the stat of replication for each wal group. + * @return stat of replication + */ + default Map getWalGroupStatus() { + return null; + } + + /** * @return whether this is a replication source for recovery. */ default boolean isRecovered() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a370867..adf4865 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -392,7 +392,8 @@ public class ReplicationSourceManager implements ReplicationListener { // synchronized on oldsources to avoid race with NodeFailoverWorker synchronized (this.oldsources) { List previousQueueIds = new ArrayList<>(); - for (ReplicationSourceInterface oldSource : this.oldsources) { + for (int i = 0; i < oldsources.size(); i++) { + ReplicationSourceInterface oldSource = oldsources.get(i); if (oldSource.getPeerId().equals(peerId)) { previousQueueIds.add(oldSource.getQueueId()); oldSource.terminate(terminateMessage); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 51df46a..0959433 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -70,6 +70,9 @@ public class ReplicationSourceShipper extends Thread { protected final long sleepForRetries; // Maximum number of retries before taking bold actions protected final int maxRetriesMultiplier; + + private final int DEFAULT_TIMEOUT = 20; + private final int getEntriesTimeout; public ReplicationSourceShipper(Configuration conf, String walGroupId, PriorityBlockingQueue queue, ReplicationSourceInterface source) { @@ -81,6 +84,8 @@ public class ReplicationSourceShipper extends Thread { this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per + this.getEntriesTimeout = + this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds } @Override @@ -96,7 +101,13 @@ public class ReplicationSourceShipper extends Thread { continue; } try { - WALEntryBatch entryBatch = entryReader.take(); + WALEntryBatch entryBatch = entryReader.take(getEntriesTimeout); + if (entryBatch == null) { + //since there is no logs need to replicate, we refresh the ageOfLastShippedOp + source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), + walGroupId); + continue; + } // the NO_MORE_DATA instance has no path so do not all shipEdits if (entryBatch == WALEntryBatch.NO_MORE_DATA) { noMoreData(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index f685a9b..1fcb9e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -77,6 +78,8 @@ class ReplicationSourceWALReader extends Thread { private AtomicLong totalBufferUsed; private long totalBufferQuota; + private Path currentLogPath; + /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -134,6 +137,7 @@ class ReplicationSourceWALReader extends Thread { if (!checkQuota()) { continue; } + updateCurrentLogPath(entryStream.getCurrentPath()); WALEntryBatch batch = readWALEntries(entryStream); currentPosition = entryStream.getPosition(); if (batch != null) { @@ -163,6 +167,19 @@ class ReplicationSourceWALReader extends Thread { } } + private void updateCurrentLogPath(Path currentLogPath){ + if(this.currentLogPath != currentLogPath){ + this.currentLogPath = currentLogPath; + } + } + + /** + * return current log path that the reader opened. + */ + public Path getCurrentLogPath(){ + return this.currentLogPath; + } + // returns true if we reach the size limit for batch, i.e, we need to finish the batch and return. protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { WALEdit edit = entry.getEdit(); @@ -299,6 +316,10 @@ class ReplicationSourceWALReader extends Thread { return entryBatchQueue.take(); } + public WALEntryBatch take(long timeout) throws InterruptedException { + return entryBatchQueue.poll(timeout, TimeUnit.SECONDS); + } + private long getEntrySizeIncludeBulkLoad(Entry entry) { WALEdit edit = entry.getEdit(); WALKey key = entry.getKey(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java new file mode 100644 index 0000000..5b76d03 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class ReplicationStatus { + private String peerId; + private String walGroup; + private Path currentPath; + private int queueSize; + private long ageOfLastShippedOp; + private long replicationDelay; + private long currentPosition; + private long fileSize; + + public long getCurrentPosition() { + return currentPosition; + } + + public ReplicationStatus setCurrentPosition(long currentPosition) { + this.currentPosition = currentPosition; + return this; + } + + public long getFileSize() { + return fileSize; + } + + public ReplicationStatus setFileSize(long fileSize) { + this.fileSize = fileSize; + return this; + } + + public String getPeerId() { + return peerId; + } + + public ReplicationStatus setPeerId(String peerId) { + this.peerId = peerId; + return this; + } + + public String getWalGroup() { + return walGroup; + } + + public ReplicationStatus setWalGroup(String walGroup) { + this.walGroup = walGroup; + return this; + } + + public int getQueueSize() { + return queueSize; + } + + public ReplicationStatus setQueueSize(int queueSize) { + this.queueSize = queueSize; + return this; + } + + public long getAgeOfLastShippedOp() { + return ageOfLastShippedOp; + } + + public ReplicationStatus setAgeOfLastShippedOp(long ageOfLastShippedOp) { + this.ageOfLastShippedOp = ageOfLastShippedOp; + return this; + } + + public long getReplicationDelay() { + return replicationDelay; + } + + public ReplicationStatus setReplicationDelay(long replicationDelay) { + this.replicationDelay = replicationDelay; + return this; + } + + public Path getCurrentPath() { + return currentPath; + } + + public ReplicationStatus setCurrentPath(Path currentPath) { + this.currentPath = currentPath; + return this; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java new file mode 100644 index 0000000..462ddbb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationMetricsforUI extends TestReplicationBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationMetricsforUI.class); + + @Test + public void testReplicationMetrics() throws IOException { + try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { + HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName); + Map metrics = rs.getWalGroupsReplicationStatus(); + Assert.assertEquals("metric size ", 1, metrics.size()); + long lastPosition = 0; + for (Map.Entry metric : metrics.entrySet()) { + Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId()); + Assert.assertEquals("queue length", 1, metric.getValue().getQueueSize()); + Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay()); + Assert.assertTrue("current position", metric.getValue().getCurrentPosition() >= 0); + lastPosition = metric.getValue().getCurrentPosition(); + + } + + final byte[] qualName = Bytes.toBytes("q"); + Put p; + hbaseAdmin.disableReplicationPeer(PEER_ID2); + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p = new Put(Bytes.toBytes("" + Integer.toString(i))); + p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay " + i)); + htable1.put(p); + } + Thread.sleep(500); + hbaseAdmin.enableReplicationPeer(PEER_ID2); + rs = utility1.getRSForFirstRegionInTable(tableName); + metrics = rs.getWalGroupsReplicationStatus(); + Path lastPath = null; + for (Map.Entry metric : metrics.entrySet()) { + lastPath = metric.getValue().getCurrentPath(); + Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId()); + Assert.assertTrue("replication delay should be > 0 ", + metric.getValue().getReplicationDelay() > 0); + Assert.assertTrue("current position", + metric.getValue().getCurrentPosition() - lastPosition >= 0); + } + + hbaseAdmin.rollWALWriter(rs.getServerName()); + runSmallBatchTest(); + Thread.sleep(500); + metrics = rs.getWalGroupsReplicationStatus(); + for (Map.Entry metric : metrics.entrySet()) { + Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay()); + Assert.assertEquals("current position", 0, metric.getValue().getCurrentPosition()); + Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath()); + } + + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + +} -- 2.7.4