From ceee91eef282da6eac1c806c879f698981f6e71a Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Thu, 15 Nov 2018 11:00:57 +0000 Subject: [PATCH] HBASE-21505 - initial version for more detailed report from status replication commnd --- .../org/apache/hadoop/hbase/ServerLoad.java | 9 +++ .../apache/hadoop/hbase/ServerMetrics.java | 6 ++ .../hadoop/hbase/ServerMetricsBuilder.java | 25 ++++++- .../replication/ReplicationLoadSource.java | 40 ++++++++++- .../hbase/shaded/protobuf/ProtobufUtil.java | 24 ++++++- .../src/main/protobuf/ClusterStatus.proto | 20 +++++- .../src/main/protobuf/ClusterStatus.proto | 27 ++++++-- .../hbase/regionserver/HRegionServer.java | 4 +- .../HBaseInterClusterReplicationEndpoint.java | 6 -- .../regionserver/MetricsSource.java | 55 ++++++++++++--- .../replication/regionserver/Replication.java | 21 ++---- .../regionserver/ReplicationLoad.java | 69 ++++++++++--------- .../regionserver/ReplicationSource.java | 27 +++++++- .../ReplicationSourceManager.java | 5 ++ .../ReplicationSourceShipper.java | 15 ++-- .../ReplicationSourceWALReader.java | 6 ++ .../regionserver/WALEntryStream.java | 1 + hbase-shell/src/main/ruby/hbase/admin.rb | 46 ++++++++++--- 18 files changed, 308 insertions(+), 98 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java index dbf00700b0..f7f3204ac9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java @@ -400,6 +400,15 @@ public class ServerLoad implements ServerMetrics { return metrics.getReplicationLoadSourceList(); } + /** + * Call directly from client such as hbase shell + * @return a map of ReplicationLoadSource list per peer id + */ + @Override + public Map> getReplicationLoadSourceMap() { + return metrics.getReplicationLoadSourceMap(); + } + /** * Call directly from client such as hbase shell * @return ReplicationLoadSink diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java index 1e1d395e59..391e62ff39 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java @@ -75,6 +75,12 @@ public interface ServerMetrics { */ List getReplicationLoadSourceList(); + /** + * Call directly from client such as hbase shell + * @return a map of ReplicationLoadSource list per peer id + */ + Map> getReplicationLoadSourceMap(); + /** * Call directly from client such as hbase shell * @return ReplicationLoadSink diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java index 333344ba52..4ab641f6ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java @@ -18,14 +18,20 @@ package org.apache.hadoop.hbase; import edu.umd.cs.findbugs.annotations.Nullable; + +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javafx.util.Pair; import org.apache.hadoop.hbase.replication.ReplicationLoadSink; import org.apache.hadoop.hbase.replication.ReplicationLoadSource; import org.apache.hadoop.hbase.util.Bytes; @@ -74,7 +80,7 @@ public final class ServerMetricsBuilder { .map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList())) .setRegionMetrics(serverLoadPB.getRegionLoadsList().stream() .map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList())) - .setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream() + .setReplicationLoadSources(serverLoadPB.getReplLoadSourceMapList().stream() .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList())) .setReplicationLoadSink(serverLoadPB.hasReplLoadSink() ? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink()) @@ -104,6 +110,9 @@ public final class ServerMetricsBuilder { .addAllReplLoadSource(metrics.getReplicationLoadSourceList().stream() .map(ProtobufUtil::toReplicationLoadSource) .collect(Collectors.toList())) + .addAllReplLoadSourceMap(metrics.getReplicationLoadSourceList().stream() + .map(ProtobufUtil::toReplicationLoadSourceMapEntry) + .collect(Collectors.toList())) .setReportStartTime(metrics.getLastReportTimestamp()) .setReportEndTime(metrics.getReportTimestamp()); if (metrics.getReplicationLoadSink() != null) { @@ -301,6 +310,20 @@ public final class ServerMetricsBuilder { return Collections.unmodifiableList(sources); } + @Override + public Map> getReplicationLoadSourceMap(){ + Map> sourcesMap = new HashMap<>(); + for(ReplicationLoadSource loadSource : sources){ + List list = sourcesMap.get(loadSource.getPeerID()); + if(list==null){ + list = new ArrayList<>(); + sourcesMap.put(loadSource.getPeerID(),list); + } + list.add(loadSource); + } + return sourcesMap; + } + @Override public ReplicationLoadSink getReplicationLoadSink() { return sink; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java index 9e24e225f1..4f6b03fe80 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java @@ -22,15 +22,29 @@ public class ReplicationLoadSource { private final int sizeOfLogQueue; private final long timestampOfLastShippedOp; private final long replicationLag; + private long timeStampOfLastAttemtped; + private String queueId; + private boolean recovered; + private boolean running; + private boolean connectedToPeer; + private boolean editsSinceRestart; // TODO: add the builder for this class @InterfaceAudience.Private - public ReplicationLoadSource(String id, long age, int size, long timestamp, long lag) { + public ReplicationLoadSource(String id, long age, int size, long timestamp, + long timeStampOfLastAttemtped, long lag, String queueId, boolean recovered, boolean running, + boolean connectedToPeer, boolean editsSinceRestart) { this.peerID = id; this.ageOfLastShippedOp = age; this.sizeOfLogQueue = size; this.timestampOfLastShippedOp = timestamp; this.replicationLag = lag; + this.timeStampOfLastAttemtped = timeStampOfLastAttemtped; + this.queueId = queueId; + this.recovered = recovered; + this.running = running; + this.connectedToPeer = connectedToPeer; + this.editsSinceRestart = editsSinceRestart; } public String getPeerID() { @@ -61,4 +75,26 @@ public class ReplicationLoadSource { public long getReplicationLag() { return this.replicationLag; } -} + + public long getTimeStampOfLastAttemtped() { + return this.timeStampOfLastAttemtped; + } + + public String getQueueId() { + return queueId; + } + + public boolean isRecovered() { + return recovered; + } + + public boolean isRunning() { + return running; + } + + public boolean isConnectedToPeer() { + return connectedToPeer; + } + + public boolean hasEditsSinceRestart() { return editsSinceRestart;} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 6548094fc4..866423d763 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2693,7 +2693,15 @@ public final class ProtobufUtil { public static ReplicationLoadSource toReplicationLoadSource( ClusterStatusProtos.ReplicationLoadSource rls) { return new ReplicationLoadSource(rls.getPeerID(), rls.getAgeOfLastShippedOp(), - rls.getSizeOfLogQueue(), rls.getTimeStampOfLastShippedOp(), rls.getReplicationLag()); + rls.getSizeOfLogQueue(), rls.getTimeStampOfLastShippedOp(), + rls.getTimeStampOfLastAttemtpedOp(), rls.getReplicationLag(), + rls.getQueueId(), rls.getRecovered(), rls.getRunning(), + rls.getConnectedToPeer(), rls.getEditsSinceRestart()); + } + + public static ReplicationLoadSource toReplicationLoadSource( + ClusterStatusProtos.ReplicationLoadSourceMapEntry mapEntry) { + return toReplicationLoadSource(mapEntry.getValue()); } /** @@ -3202,6 +3210,20 @@ public final class ProtobufUtil { .setSizeOfLogQueue((int) rls.getSizeOfLogQueue()) .setTimeStampOfLastShippedOp(rls.getTimestampOfLastShippedOp()) .setReplicationLag(rls.getReplicationLag()) + .setQueueId(rls.getQueueId()) + .setRecovered(rls.isRecovered()) + .setRunning(rls.isRunning()) + .setConnectedToPeer(rls.isConnectedToPeer()) + .setEditsSinceRestart(rls.hasEditsSinceRestart()) + .setTimeStampOfLastAttemtpedOp(rls.getTimeStampOfLastAttemtped()) + .build(); + } + + public static ClusterStatusProtos.ReplicationLoadSourceMapEntry toReplicationLoadSourceMapEntry( + ReplicationLoadSource rls) { + return ClusterStatusProtos.ReplicationLoadSourceMapEntry.newBuilder() + .setKey(rls.getPeerID()) + .setValue(toReplicationLoadSource(rls)) .build(); } diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto index d39db362ea..5b7a0a7f69 100644 --- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto @@ -161,6 +161,17 @@ message ReplicationLoadSource { required uint32 sizeOfLogQueue = 3; required uint64 timeStampOfLastShippedOp = 4; required uint64 replicationLag = 5; + required uint64 timeStampOfLastAttemtpedOp=6; + required string queueId = 7; + required bool recovered = 8; + required bool running = 9; + required bool connectedToPeer = 10; + required bool editsSinceRestart = 11; +} + +message ReplicationLoadSourceMapEntry { + required string key = 1; + required ReplicationLoadSource value = 2; } message ServerLoad { @@ -206,7 +217,8 @@ message ServerLoad { optional uint32 info_server_port = 9; /** - * The replicationLoadSource for the replication Source status of this region server. + * Map of replicationLoadSource instances per peer ID + * for the replication Source status of this region server. */ repeated ReplicationLoadSource replLoadSource = 10; @@ -214,6 +226,12 @@ message ServerLoad { * The replicationLoadSink for the replication Sink status of this region server. */ optional ReplicationLoadSink replLoadSink = 11; + + /** + * Map of replicationLoadSource instances per peer ID + * for the replication Source status of this region server. + */ + repeated ReplicationLoadSourceMapEntry replLoadSourceMap = 12; } message LiveServerInfo { diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto b/hbase-protocol/src/main/protobuf/ClusterStatus.proto index 009d172c36..601e8715bd 100644 --- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto @@ -157,6 +157,17 @@ message ReplicationLoadSource { required uint32 sizeOfLogQueue = 3; required uint64 timeStampOfLastShippedOp = 4; required uint64 replicationLag = 5; + required uint64 timeStampOfLastAttemtpedOp=6; + required string queueId = 7; + required bool recovered = 8; + required bool running = 9; + required bool connectedToPeer = 10; + required bool editsSinceRestart = 11; +} + +message ReplicationLoadSourceMapEntry { + required string key = 1; + required ReplicationLoadSource value = 2; } message ServerLoad { @@ -201,15 +212,23 @@ message ServerLoad { */ optional uint32 info_server_port = 9; - /** - * The replicationLoadSource for the replication Source status of this region server. - */ - repeated ReplicationLoadSource replLoadSource = 10; + /** + * Map of replicationLoadSource instances per peer ID + * for the replication Source status of this region server. + */ + repeated ReplicationLoadSource replLoadSource = 10; + /** * The replicationLoadSink for the replication Sink status of this region server. */ optional ReplicationLoadSink replLoadSink = 11; + + /** + * Map of replicationLoadSource instances per peer ID + * for the replication Source status of this region server. + */ + repeated ReplicationLoadSourceMapEntry replLoadSourceMap = 12; } message LiveServerInfo { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 24743b9126..1ab754a445 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1387,8 +1387,8 @@ public class HRegionServer extends HasThread implements ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); if (rLoad != null) { serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); - for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) { - serverLoad.addReplLoadSource(rLS); + for (ClusterStatusProtos.ReplicationLoadSourceMapEntry rLS : rLoad.getReplicationLoadSourceMapEntries()) { + serverLoad.addReplLoadSourceMap(rLS); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 7db53aa7c1..bd9324d883 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -376,14 +376,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // replicate the batches to sink side. lastWriteTime = parallelReplicate(pool, replicateContext, batches); - // update metrics - if (lastWriteTime > 0) { - this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId); - } return true; } catch (IOException ioe) { - // Didn't ship anything, but must still age the last time we did - this.metrics.refreshAgeOfLastShippedOp(walGroupId); if (ioe instanceof RemoteException) { ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 830ebe182d..679018dae2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -41,7 +41,9 @@ public class MetricsSource implements BaseSource { private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class); // tracks last shipped timestamp for each wal group - private Map lastTimestamps = new HashMap<>(); + private Map lastShippedTimeStamps = new HashMap(); + // tracks last attempted timestamp for each wal group + private Map lastAttemptedTimeStamps = new HashMap(); private Map ageOfLastShippedOp = new HashMap<>(); private long lastHFileRefsQueueSize = 0; private String id; @@ -81,7 +83,7 @@ public class MetricsSource implements BaseSource { /** * Set the age of the last edit that was shipped - * @param timestamp write time of the edit + * @param timestamp target write time of the edit * @param walGroup which group we are setting */ public void setAgeOfLastShippedOp(long timestamp, String walGroup) { @@ -89,7 +91,17 @@ public class MetricsSource implements BaseSource { singleSourceSource.setLastShippedAge(age); globalSourceSource.setLastShippedAge(age); this.ageOfLastShippedOp.put(walGroup, age); - this.lastTimestamps.put(walGroup, timestamp); + this.lastShippedTimeStamps.put(walGroup, timestamp); + } + + /** + * Set the timestamp the current edit being repicated was written in source, + * in order to accurately calculate lags. + * @param timestamp source write time of the edit + * @param walGroup which group we are setting + */ + public synchronized void setTimeStampOfLastAttempted(long timestamp, String walGroup) { + this.lastAttemptedTimeStamps.put(walGroup, timestamp); } /** @@ -111,7 +123,7 @@ public class MetricsSource implements BaseSource { * @return timeStamp */ public long getLastTimeStampOfWalGroup(String walGroup) { - return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup); + return this.lastAttemptedTimeStamps.get(walGroup) == null ? 0 : lastAttemptedTimeStamps.get(walGroup); } /** @@ -129,9 +141,9 @@ public class MetricsSource implements BaseSource { * @param walGroupId id of the group to update */ public void refreshAgeOfLastShippedOp(String walGroupId) { - Long lastTimestamp = this.lastTimestamps.get(walGroupId); + Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId); if (lastTimestamp == null) { - this.lastTimestamps.put(walGroupId, 0L); + this.lastShippedTimeStamps.put(walGroupId, 0L); lastTimestamp = 0L; } if (lastTimestamp > 0) { @@ -223,7 +235,8 @@ public class MetricsSource implements BaseSource { singleSourceSource.decrSizeOfLogQueue(lastQueueSize); singleSourceSource.clear(); globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); - lastTimestamps.clear(); + lastShippedTimeStamps.clear(); + lastAttemptedTimeStamps.clear(); lastHFileRefsQueueSize = 0; } @@ -260,7 +273,7 @@ public class MetricsSource implements BaseSource { */ public long getTimestampOfLastShippedOp() { long lastTimestamp = 0L; - for (long ts : lastTimestamps.values()) { + for (long ts : lastShippedTimeStamps.values()) { if (ts > lastTimestamp) { lastTimestamp = ts; } @@ -268,6 +281,32 @@ public class MetricsSource implements BaseSource { return lastTimestamp; } + /** + * Get map of all timestamps for edits that arrived since last restart of RS. + * @return map containing timestamps of last arrived edit for each wal group + */ + public Map getLastAttemptedTimeStamps() { + return lastAttemptedTimeStamps; + } + + /** + * Get timeStampOfLastAttemtpedOp, if there are multiple groups, return the oldest one + * @return timeStampOfLastAttemtpedOp + */ + public long getTimeStampOfOldestAttemtpedOp(){ + long lastTimestamp = EnvironmentEdgeManager.currentTime(); + synchronized (this) { + for (long ts : lastAttemptedTimeStamps.values()) { + if (ts < lastTimestamp) { + lastTimestamp = ts; + } + } + } + return lastTimestamp; + } + + + /** * Get the slave peer ID * @return peerID diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 799d9750ed..9a449ad21c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -274,25 +274,12 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } private void buildReplicationLoad() { - List sourceMetricsList = new ArrayList<>(); - - // get source - List sources = this.replicationManager.getSources(); - for (ReplicationSourceInterface source : sources) { - sourceMetricsList.add(source.getSourceMetrics()); - } - - // get old source - List oldSources = this.replicationManager.getOldSources(); - for (ReplicationSourceInterface source : oldSources) { - if (source instanceof ReplicationSource) { - sourceMetricsList.add(source.getSourceMetrics()); - } - } - + List allSources = new ArrayList<>(); + allSources.addAll(this.replicationManager.getSources()); + allSources.addAll(this.replicationManager.getOldSources()); // get sink MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); - this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); + this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 53e560b0d5..b007ffa9d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -37,11 +37,9 @@ public class ReplicationLoad { // Empty load instance. public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad(); - - private List sourceMetricsList; private MetricsSink sinkMetrics; - private List replicationLoadSourceList; + private List replicationLoadSourceMapEntries; private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink; /** default constructor */ @@ -51,13 +49,13 @@ public class ReplicationLoad { /** * buildReplicationLoad - * @param srMetricsList + * @param sources * @param skMetrics */ - public void buildReplicationLoad(final List srMetricsList, + public void buildReplicationLoad(final List sources, final MetricsSink skMetrics) { - this.sourceMetricsList = srMetricsList; +// this.sourceMetricsList = srMetricsList; this.sinkMetrics = skMetrics; // build the SinkLoad @@ -67,10 +65,9 @@ public class ReplicationLoad { rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp()); this.replicationLoadSink = rLoadSinkBuild.build(); - // build the SourceLoad List - Map replicationLoadSourceMap = - new HashMap<>(); - for (MetricsSource sm : this.sourceMetricsList) { + this.replicationLoadSourceMapEntries = new ArrayList<>(); + for (ReplicationSourceInterface source : sources) { + MetricsSource sm = source.getSourceMetrics(); // Get the actual peer id String peerId = sm.getPeerID(); String[] parts = peerId.split("-", 2); @@ -79,16 +76,12 @@ public class ReplicationLoad { long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); int sizeOfLogQueue = sm.getSizeOfLogQueue(); long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp(); - long replicationLag = - calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue); - - ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); - if (rLoadSource != null) { - ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp); - sizeOfLogQueue += rLoadSource.getSizeOfLogQueue(); - timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(), - timeStampOfLastShippedOp); - replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag); + long replicationLag; + long timeStampOfOldestAttemtpedOp = sm.getTimeStampOfOldestAttemtpedOp(); + if(timeStampOfLastShippedOp>=timeStampOfOldestAttemtpedOp){ + replicationLag = 0L; + }else{ + replicationLag=EnvironmentEdgeManager.currentTime() - timeStampOfOldestAttemtpedOp; } ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild = ClusterStatusProtos.ReplicationLoadSource.newBuilder(); @@ -97,10 +90,24 @@ public class ReplicationLoad { rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue); rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp); rLoadSourceBuild.setReplicationLag(replicationLag); + rLoadSourceBuild.setTimeStampOfLastAttemtpedOp(timeStampOfOldestAttemtpedOp); + if (source instanceof ReplicationSource){ + ReplicationSource replSource = (ReplicationSource)source; + rLoadSourceBuild.setRecovered(replSource.getReplicationQueueInfo().isQueueRecovered()); + rLoadSourceBuild.setQueueId(replSource.getReplicationQueueInfo().getQueueId()); + rLoadSourceBuild.setRunning(replSource.isWorkerRunning()); + rLoadSourceBuild.setConnectedToPeer(replSource.getPeerClusterUUID()!=null); + rLoadSourceBuild.setEditsSinceRestart(!replSource.getSourceMetrics() + .getLastAttemptedTimeStamps().isEmpty()); + } - replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build()); + ClusterStatusProtos.ReplicationLoadSourceMapEntry.Builder rLoadSourceBuildEntry = + ClusterStatusProtos.ReplicationLoadSourceMapEntry.newBuilder(); + rLoadSourceBuildEntry.setKey(peerId); + rLoadSourceBuildEntry.setValue(rLoadSourceBuild.build()); + + this.replicationLoadSourceMapEntries.add(rLoadSourceBuildEntry.build()); } - this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values()); } static long calculateReplicationDelay(long ageOfLastShippedOp, @@ -131,19 +138,17 @@ public class ReplicationLoad { * @return a string contains sourceReplicationLoad information */ public String sourceToString() { - if (this.sourceMetricsList == null) return null; - StringBuilder sb = new StringBuilder(); - for (ClusterStatusProtos.ReplicationLoadSource rls : this.replicationLoadSourceList) { + for (ClusterStatusProtos.ReplicationLoadSourceMapEntry rls : this.replicationLoadSourceMapEntries) { - sb = Strings.appendKeyValue(sb, "\n PeerID", rls.getPeerID()); - sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp()); - sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue()); + sb = Strings.appendKeyValue(sb, "\n PeerID", rls.getKey()); + sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getValue().getAgeOfLastShippedOp()); + sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getValue().getSizeOfLogQueue()); sb = Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp", - (new Date(rls.getTimeStampOfLastShippedOp()).toString())); - sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag()); + (new Date(rls.getValue().getTimeStampOfLastShippedOp()).toString())); + sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getValue().getReplicationLag()); } return sb.toString(); @@ -171,8 +176,8 @@ public class ReplicationLoad { return this.replicationLoadSink; } - public List getReplicationLoadSourceList() { - return this.replicationLoadSourceList; + public List getReplicationLoadSourceMapEntries() { + return this.replicationLoadSourceMapEntries; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 10fa50f553..ccd561a1f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -179,7 +179,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; - LOG.info("queueId={}, ReplicationSource : {}, currentBandwidth={}", queueId, + LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } @@ -209,7 +209,8 @@ public class ReplicationSource implements ReplicationSourceInterface { } else { queue.put(log); } - + LOG.trace("Added log file {} to queue of source {}.", logPrefix, + this.replicationQueueInfo.getQueueId()); this.metrics.incrSizeOfLogQueue(); // This will log a warning for each new log that gets created above the warn threshold int queueSize = queue.size(); @@ -476,6 +477,8 @@ public class ReplicationSource implements ReplicationSourceInterface { for (;;) { peerClusterId = replicationEndpoint.getPeerUUID(); if (this.isSourceActive() && peerClusterId == null) { + LOG.debug("Could not connect to Peer ZK. Sleeping for " + + (this.sleepForRetries * sleepMultiplier) + " millis."); if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; } @@ -493,7 +496,8 @@ public class ReplicationSource implements ReplicationSourceInterface { this.manager.removeSource(this); return; } - LOG.info("Replicating " + clusterId + " -> " + peerClusterId); + LOG.info("Source: {}, is now replicating from cluster: {}; to peer cluster: {};", + this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); initializeWALEntryFilter(peerClusterId); // start workers @@ -608,6 +612,10 @@ public class ReplicationSource implements ReplicationSourceInterface { return !this.server.isStopped() && this.sourceRunning; } + public UUID getPeerClusterUUID(){ + return this.clusterId; + } + /** * Comparator used to compare logs together based on their start time */ @@ -633,6 +641,19 @@ public class ReplicationSource implements ReplicationSourceInterface { } } + public ReplicationQueueInfo getReplicationQueueInfo() { + return replicationQueueInfo; + } + + public boolean isWorkerRunning(){ + for( ReplicationSourceShipper worker : this.workerThreads.values()){ + if(worker.isActive()){ + return worker.isActive(); + } + } + return false; + } + @Override public String getStats() { StringBuilder sb = new StringBuilder(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 20c1215950..781edcf7d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -381,6 +381,7 @@ public class ReplicationSourceManager implements ReplicationListener { abortAndThrowIOExceptionWhenFail( () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName())); src.enqueueLog(walPath); + LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); } } } @@ -789,6 +790,8 @@ public class ReplicationSourceManager implements ReplicationListener { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); + LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", + newLog, source.getQueueId()); } } @@ -960,7 +963,9 @@ public class ReplicationSourceManager implements ReplicationListener { wals.add(wal); } oldsources.add(src); + LOG.info("Added source for recovered queue: " + src.getQueueId()); for (String wal : walsSet) { + LOG.info("Enqueueing log from recovered queue for source: " + src.getQueueId()); src.enqueueLog(new Path(oldLogDir, wal)); } src.startup(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 5d6198e96e..03f53fc899 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -92,6 +92,7 @@ public class ReplicationSourceShipper extends Thread { @Override public final void run() { setWorkerState(WorkerState.RUNNING); + LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); // Loop until we close down while (isActive()) { // Sleep until replication is enabled again @@ -103,10 +104,9 @@ public class ReplicationSourceShipper extends Thread { } try { WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); + LOG.trace("Shipper from source {} got entry batch from reader: {}", + source.getQueueId(), entryBatch); if (entryBatch == null) { - // since there is no logs need to replicate, we refresh the ageOfLastShippedOp - source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), - walGroupId); continue; } // the NO_MORE_DATA instance has no path so do not call shipEdits @@ -163,12 +163,7 @@ public class ReplicationSourceShipper extends Thread { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { - if (updateLogPosition(entryBatch)) { - // if there was nothing to ship and it's not an error - // set "ageOfLastShippedOp" to to indicate that we're current - source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), - walGroupId); - } + updateLogPosition(entryBatch); return; } int currentSize = (int) entryBatch.getHeapSize(); @@ -309,7 +304,7 @@ public class ReplicationSourceShipper extends Thread { return 0; } - private boolean isActive() { + public boolean isActive() { return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index b3bdb02940..074199d4c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -170,6 +170,12 @@ class ReplicationSourceWALReader extends Thread { if (edit == null || edit.isEmpty()) { return false; } + LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", + entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); + //sets the time of last attempted to be replicated for this edit + String logPrefix = this.logQueue.peek().getName(); + this.source.getSourceMetrics().setTimeStampOfLastAttempted( + entry.getKey().getWriteTime(), logPrefix); long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 0393af4970..cb25629768 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -173,6 +173,7 @@ class WALEntryStream implements Closeable { private void tryAdvanceEntry() throws IOException { if (checkReader()) { boolean beingWritten = readNextEntryAndRecordReaderPosition(); + LOG.trace("reading wal file {}. Current open for write: {}", this.currentPath, beingWritten); if (currentEntry == null && !beingWritten) { // no more entries in this log file, and the file is already closed, i.e, rolled // Before dequeueing, we should always get one more attempt at reading. diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 898feae8e9..b93ddc0b7e 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -789,24 +789,48 @@ module Hbase puts(format('%d live servers', status.getServersSize)) for server in status.getServers sl = status.getLoad(server) - rSinkString = ' SINK :' + rSinkString = " SINK:" rSourceString = ' SOURCE:' rLoadSink = sl.getReplicationLoadSink next if rLoadSink.nil? rSinkString << ' AgeOfLastAppliedOp=' + rLoadSink.getAgeOfLastAppliedOp.to_s rSinkString << ', TimeStampsOfLastAppliedOp=' + java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp).toString - rLoadSourceList = sl.getReplicationLoadSourceList + rLoadSourceMap = sl.getReplicationLoadSourceMap() index = 0 - while index < rLoadSourceList.size - rLoadSource = rLoadSourceList.get(index) - rSourceString << ' PeerID=' + rLoadSource.getPeerID - rSourceString << ', AgeOfLastShippedOp=' + rLoadSource.getAgeOfLastShippedOp.to_s - rSourceString << ', SizeOfLogQueue=' + rLoadSource.getSizeOfLogQueue.to_s - rSourceString << ', TimeStampsOfLastShippedOp=' + - java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp).toString - rSourceString << ', Replication Lag=' + rLoadSource.getReplicationLag.to_s - index += 1 + for peer in rLoadSourceMap.keySet() + rSourceString << " PeerID=" + peer + for sourceLoad in rLoadSourceMap.get(peer) + if sourceLoad.isRecovered() + rSourceString << "\n Recovered Queue: " + else + rSourceString << "\n Normal Queue: " + end + rSourceString << sourceLoad.getQueueId() + if (sourceLoad.isRunning()) + if (sourceLoad.getTimeStampOfLastShippedOp() == 0) + rSourceString << "\n No Ops shipped since last restart" + else + rSourceString << "\n AgeOfLastShippedOp=" + sourceLoad.getAgeOfLastShippedOp().to_s + rSourceString << ", TimeStampOfLastShippedOp=" + + (java.util.Date.new(sourceLoad.getTimeStampOfLastShippedOp())).toString() + end + rSourceString << ", SizeOfLogQueue=" + sourceLoad.getSizeOfLogQueue().to_s + if (sourceLoad.isConnectedToPeer()) + if (sourceLoad.hasEditsSinceRestart() ) + rSourceString << ", TimeStampOfLastArrivedInSource=" + + (java.util.Date.new(sourceLoad.getTimeStampOfLastAttemtped())).toString() + else + rSourceString << ", No edits for this source since it started" + end + rSourceString << ", Replication Lag=" + sourceLoad.getReplicationLag().to_s + else + rSourceString << ", still trying to connect to peer ZooKeeper." + end + else + rSourceString << "\n Not runnning yet." + end + end end puts(format(' %s:', server.getHostname)) if type.casecmp('SOURCE') == 0 -- 2.17.2 (Apple Git-113)