From 2ac72d88b58b0e2916d73d321dfacdf6601a7dd9 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Thu, 15 Nov 2018 11:00:57 +0000 Subject: [PATCH] HBASE-21505 - proposal for a more consistent report on status of replication sources running on source cluster. Some of "status 'replication'" command issues currently addressed in this patch: 1) TimeStampsOfLastShippedOp keeps getting updated and increasing even when no new edits were added to source, so nothing was really shipped 2) When replication is stuck due some connectivity issues or target unavailability, if new edits are added in source, reported AgeOfLastShippedOp is wrongly showing same value as "Replication Lag". 3) AgeOfLastShippedOp gets set to 0 even when a given edit had taken some time before it got finally shipped to target. 4) When replication is stuck due some connectivity issues or target unavailability, if RS is restarted, once recovered queue source is started, TimeStampsOfLastShippedOp is set to initial java date (Thu Jan 01 01:00:00 GMT 1970, for example), thus "Replication Lag" also gives a complete inaccurate value. This patch modifies current "status 'replication'" command as follows: 1) Each running source (normal and additional recovery queues) will be listed individually in the output, whilst current behaviour tries to aggregate information in a single one; 2) AgeOfLastShippedOp metric will not be shown if no OP was actually replicated, an explicit description message will be displayed instead. Currently, it simply shows 0 and can lead to wrong conclusion that replication is operating properly. 3) TimeStampsOfLastShippedOp metric will not be shown if no OP was actually replicated, an explicit description message will be displayed instead. Currently, it shows RS startup time, which is also confusing. 4) New metric TimeStampOfNextToReplicate was added to help calculate replication lags and easily spot for how long replication has been stuck. This shows time stamp of next edit pending replication. 5) New metrics EditsReadFromLogQueue and OpsShippedToTarget were added to help determine the delta (in # of OPs) between source and target. 6) AgeOfLastShippedOp is now calculated as the difference between the time the given edit was added to source's WAL and time it got actually shipped, so this can show non 0 values even when replication is not stuck. This is useful to give a sense of "how fast" replication is going. 7) Replication Lag now calculated as (CurrentTime - TimeStampOfNextToReplicate) if AgeOfLastShippedOp < TimeStampOfNextToReplicate. --- .../org/apache/hadoop/hbase/ServerLoad.java | 9 + .../apache/hadoop/hbase/ServerMetrics.java | 6 + .../hadoop/hbase/ServerMetricsBuilder.java | 23 +- .../replication/ReplicationLoadSource.java | 48 +++- .../hbase/shaded/protobuf/ProtobufUtil.java | 25 ++- .../MetricsReplicationSourceSource.java | 3 + .../MetricsReplicationGlobalSourceSource.java | 12 + .../MetricsReplicationSourceSourceImpl.java | 12 + .../src/main/protobuf/ClusterStatus.proto | 21 +- .../src/main/protobuf/ClusterStatus.proto | 29 ++- .../hbase/regionserver/HRegionServer.java | 5 +- .../HBaseInterClusterReplicationEndpoint.java | 6 - .../regionserver/MetricsSource.java | 75 +++++-- .../replication/regionserver/Replication.java | 21 +- .../regionserver/ReplicationLoad.java | 93 +++----- .../regionserver/ReplicationSource.java | 33 ++- .../ReplicationSourceManager.java | 5 + .../ReplicationSourceShipper.java | 23 +- .../ReplicationSourceWALReader.java | 8 +- .../regionserver/WALEntryStream.java | 2 + .../replication/TestReplicationBase.java | 27 ++- .../replication/TestReplicationStatus.java | 206 +++++++++++++++++- hbase-shell/src/main/ruby/hbase/admin.rb | 44 +++- 23 files changed, 583 insertions(+), 153 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java index dbf00700b0..f7f3204ac9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java @@ -400,6 +400,15 @@ public class ServerLoad implements ServerMetrics { return metrics.getReplicationLoadSourceList(); } + /** + * Call directly from client such as hbase shell + * @return a map of ReplicationLoadSource list per peer id + */ + @Override + public Map> getReplicationLoadSourceMap() { + return metrics.getReplicationLoadSourceMap(); + } + /** * Call directly from client such as hbase shell * @return ReplicationLoadSink diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java index 1e1d395e59..391e62ff39 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java @@ -75,6 +75,12 @@ public interface ServerMetrics { */ List getReplicationLoadSourceList(); + /** + * Call directly from client such as hbase shell + * @return a map of ReplicationLoadSource list per peer id + */ + Map> getReplicationLoadSourceMap(); + /** * Call directly from client such as hbase shell * @return ReplicationLoadSink diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java index 333344ba52..5644b9d6f5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java @@ -18,14 +18,18 @@ package org.apache.hadoop.hbase; import edu.umd.cs.findbugs.annotations.Nullable; + +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.stream.Collectors; + import org.apache.hadoop.hbase.replication.ReplicationLoadSink; import org.apache.hadoop.hbase.replication.ReplicationLoadSource; import org.apache.hadoop.hbase.util.Bytes; @@ -74,7 +78,7 @@ public final class ServerMetricsBuilder { .map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList())) .setRegionMetrics(serverLoadPB.getRegionLoadsList().stream() .map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList())) - .setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream() + .setReplicationLoadSources(serverLoadPB.getReplLoadSourceMapList().stream() .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList())) .setReplicationLoadSink(serverLoadPB.hasReplLoadSink() ? ProtobufUtil.toReplicationLoadSink(serverLoadPB.getReplLoadSink()) @@ -104,6 +108,9 @@ public final class ServerMetricsBuilder { .addAllReplLoadSource(metrics.getReplicationLoadSourceList().stream() .map(ProtobufUtil::toReplicationLoadSource) .collect(Collectors.toList())) + .addAllReplLoadSourceMap(metrics.getReplicationLoadSourceList().stream() + .map(ProtobufUtil::toReplicationLoadSourceMapEntry) + .collect(Collectors.toList())) .setReportStartTime(metrics.getLastReportTimestamp()) .setReportEndTime(metrics.getReportTimestamp()); if (metrics.getReplicationLoadSink() != null) { @@ -301,6 +308,20 @@ public final class ServerMetricsBuilder { return Collections.unmodifiableList(sources); } + @Override + public Map> getReplicationLoadSourceMap(){ + Map> sourcesMap = new HashMap<>(); + for(ReplicationLoadSource loadSource : sources){ + List list = sourcesMap.get(loadSource.getPeerID()); + if(list==null){ + list = new ArrayList<>(); + sourcesMap.put(loadSource.getPeerID(),list); + } + list.add(loadSource); + } + return sourcesMap; + } + @Override public ReplicationLoadSink getReplicationLoadSink() { return sink; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java index 9e24e225f1..237646a0c7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSource.java @@ -22,15 +22,31 @@ public class ReplicationLoadSource { private final int sizeOfLogQueue; private final long timestampOfLastShippedOp; private final long replicationLag; + private long timeStampOfNextToReplicate; + private String queueId; + private boolean recovered; + private boolean running; + private boolean editsSinceRestart; + private long editsRead; + private long oPsShipped; // TODO: add the builder for this class @InterfaceAudience.Private - public ReplicationLoadSource(String id, long age, int size, long timestamp, long lag) { + public ReplicationLoadSource(String id, long age, int size, long timestamp, + long timeStampOfNextToReplicate, long lag, String queueId, boolean recovered, boolean running, + boolean editsSinceRestart, long editsRead, long oPsShipped) { this.peerID = id; this.ageOfLastShippedOp = age; this.sizeOfLogQueue = size; this.timestampOfLastShippedOp = timestamp; this.replicationLag = lag; + this.timeStampOfNextToReplicate = timeStampOfNextToReplicate; + this.queueId = queueId; + this.recovered = recovered; + this.running = running; + this.editsSinceRestart = editsSinceRestart; + this.editsRead = editsRead; + this.oPsShipped = oPsShipped; } public String getPeerID() { @@ -61,4 +77,32 @@ public class ReplicationLoadSource { public long getReplicationLag() { return this.replicationLag; } -} + + public long getTimeStampOfNextToReplicate() { + return this.timeStampOfNextToReplicate; + } + + public String getQueueId() { + return queueId; + } + + public boolean isRecovered() { + return recovered; + } + + public boolean isRunning() { + return running; + } + + public boolean hasEditsSinceRestart() { + return editsSinceRestart; + } + + public long getEditsRead() { + return editsRead; + } + + public long getOPsShipped() { + return oPsShipped; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 6548094fc4..ce51936bb2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2693,7 +2693,15 @@ public final class ProtobufUtil { public static ReplicationLoadSource toReplicationLoadSource( ClusterStatusProtos.ReplicationLoadSource rls) { return new ReplicationLoadSource(rls.getPeerID(), rls.getAgeOfLastShippedOp(), - rls.getSizeOfLogQueue(), rls.getTimeStampOfLastShippedOp(), rls.getReplicationLag()); + rls.getSizeOfLogQueue(), rls.getTimeStampOfLastShippedOp(), + rls.getTimeStampOfNextToReplicate(), rls.getReplicationLag(), + rls.getQueueId(), rls.getRecovered(), rls.getRunning(), + rls.getEditsSinceRestart(), rls.getEditsRead(), rls.getOPsShipped()); + } + + public static ReplicationLoadSource toReplicationLoadSource( + ClusterStatusProtos.ReplicationLoadSourceMapEntry mapEntry) { + return toReplicationLoadSource(mapEntry.getValue()); } /** @@ -3202,6 +3210,21 @@ public final class ProtobufUtil { .setSizeOfLogQueue((int) rls.getSizeOfLogQueue()) .setTimeStampOfLastShippedOp(rls.getTimestampOfLastShippedOp()) .setReplicationLag(rls.getReplicationLag()) + .setQueueId(rls.getQueueId()) + .setRecovered(rls.isRecovered()) + .setRunning(rls.isRunning()) + .setEditsSinceRestart(rls.hasEditsSinceRestart()) + .setTimeStampOfNextToReplicate(rls.getTimeStampOfNextToReplicate()) + .setOPsShipped(rls.getOPsShipped()) + .setEditsRead(rls.getEditsRead()) + .build(); + } + + public static ClusterStatusProtos.ReplicationLoadSourceMapEntry toReplicationLoadSourceMapEntry( + ReplicationLoadSource rls) { + return ClusterStatusProtos.ReplicationLoadSourceMapEntry.newBuilder() + .setKey(rls.getPeerID()) + .setValue(toReplicationLoadSource(rls)) .build(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 61e9431156..fa1c5f0869 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -78,4 +78,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { void incrCompletedWAL(); void incrCompletedRecoveryQueue(); void incrFailedRecoveryQueue(); + long getLogEditsRead(); + long getShippedOps(); + long getEditsFiltered(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 4e8c810138..818f368e35 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -260,4 +260,16 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS public String getMetricsName() { return rms.getMetricsName(); } + + @Override public long getLogEditsRead() { + return this.logReadInEditsCounter.value(); + } + + @Override public long getShippedOps() { + return this.shippedOpsCounter.value(); + } + + @Override public long getEditsFiltered() { + return this.logEditsFilteredCounter.value(); + } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 0ad50524f1..5ac7648951 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -314,4 +314,16 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou public String getMetricsName() { return rms.getMetricsName(); } + + @Override public long getLogEditsRead() { + return this.logReadInEditsCounter.value(); + } + + @Override public long getShippedOps() { + return this.shippedOpsCounter.value(); + } + + @Override public long getEditsFiltered() { + return this.logEditsFilteredCounter.value(); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto index d39db362ea..d0575eb4d8 100644 --- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto @@ -161,6 +161,18 @@ message ReplicationLoadSource { required uint32 sizeOfLogQueue = 3; required uint64 timeStampOfLastShippedOp = 4; required uint64 replicationLag = 5; + required uint64 timeStampOfNextToReplicate=6; + required string queueId = 7; + required bool recovered = 8; + required bool running = 9; + required bool editsSinceRestart = 10; + required uint64 editsRead = 11; + required uint64 oPsShipped = 12; +} + +message ReplicationLoadSourceMapEntry { + required string key = 1; + required ReplicationLoadSource value = 2; } message ServerLoad { @@ -206,7 +218,8 @@ message ServerLoad { optional uint32 info_server_port = 9; /** - * The replicationLoadSource for the replication Source status of this region server. + * Map of replicationLoadSource instances per peer ID + * for the replication Source status of this region server. */ repeated ReplicationLoadSource replLoadSource = 10; @@ -214,6 +227,12 @@ message ServerLoad { * The replicationLoadSink for the replication Sink status of this region server. */ optional ReplicationLoadSink replLoadSink = 11; + + /** + * Map of replicationLoadSource instances per peer ID + * for the replication Source status of this region server. + */ + repeated ReplicationLoadSourceMapEntry replLoadSourceMap = 12; } message LiveServerInfo { diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto b/hbase-protocol/src/main/protobuf/ClusterStatus.proto index 009d172c36..e6624e82fb 100644 --- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto @@ -157,6 +157,19 @@ message ReplicationLoadSource { required uint32 sizeOfLogQueue = 3; required uint64 timeStampOfLastShippedOp = 4; required uint64 replicationLag = 5; + required uint64 timeStampOfLastAttemtpedOp=6; + required string queueId = 7; + required bool recovered = 8; + required bool running = 9; + required bool connectedToPeer = 10; + required bool editsSinceRestart = 11; + required uint64 editsRead = 12; + required uint64 oPsShipped = 13; +} + +message ReplicationLoadSourceMapEntry { + required string key = 1; + required ReplicationLoadSource value = 2; } message ServerLoad { @@ -201,15 +214,23 @@ message ServerLoad { */ optional uint32 info_server_port = 9; - /** - * The replicationLoadSource for the replication Source status of this region server. - */ - repeated ReplicationLoadSource replLoadSource = 10; + /** + * Map of replicationLoadSource instances per peer ID + * for the replication Source status of this region server. + */ + repeated ReplicationLoadSource replLoadSource = 10; + /** * The replicationLoadSink for the replication Sink status of this region server. */ optional ReplicationLoadSink replLoadSink = 11; + + /** + * Map of replicationLoadSource instances per peer ID + * for the replication Source status of this region server. + */ + repeated ReplicationLoadSourceMapEntry replLoadSourceMap = 12; } message LiveServerInfo { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 24743b9126..1615edffe7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1387,8 +1387,9 @@ public class HRegionServer extends HasThread implements ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad(); if (rLoad != null) { serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink()); - for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) { - serverLoad.addReplLoadSource(rLS); + for (ClusterStatusProtos.ReplicationLoadSourceMapEntry rLS : + rLoad.getReplicationLoadSourceMapEntries()) { + serverLoad.addReplLoadSourceMap(rLS); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 7db53aa7c1..bd9324d883 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -376,14 +376,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi // replicate the batches to sink side. lastWriteTime = parallelReplicate(pool, replicateContext, batches); - // update metrics - if (lastWriteTime > 0) { - this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId); - } return true; } catch (IOException ioe) { - // Didn't ship anything, but must still age the last time we did - this.metrics.refreshAgeOfLastShippedOp(walGroupId); if (ioe instanceof RemoteException) { ioe = ((RemoteException) ioe).unwrapRemoteException(); LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 830ebe182d..66addc2e32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -41,10 +41,11 @@ public class MetricsSource implements BaseSource { private static final Logger LOG = LoggerFactory.getLogger(MetricsSource.class); // tracks last shipped timestamp for each wal group - private Map lastTimestamps = new HashMap<>(); + private Map lastShippedTimeStamps = new HashMap(); private Map ageOfLastShippedOp = new HashMap<>(); private long lastHFileRefsQueueSize = 0; private String id; + private long timeStampNextToReplicate; private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource globalSourceSource; @@ -81,7 +82,7 @@ public class MetricsSource implements BaseSource { /** * Set the age of the last edit that was shipped - * @param timestamp write time of the edit + * @param timestamp target write time of the edit * @param walGroup which group we are setting */ public void setAgeOfLastShippedOp(long timestamp, String walGroup) { @@ -89,7 +90,7 @@ public class MetricsSource implements BaseSource { singleSourceSource.setLastShippedAge(age); globalSourceSource.setLastShippedAge(age); this.ageOfLastShippedOp.put(walGroup, age); - this.lastTimestamps.put(walGroup, timestamp); + this.lastShippedTimeStamps.put(walGroup, timestamp); } /** @@ -105,15 +106,6 @@ public class MetricsSource implements BaseSource { .setLastShippedAge(age); } - /** - * get the last timestamp of given wal group. If the walGroup is null, return 0. - * @param walGroup which group we are getting - * @return timeStamp - */ - public long getLastTimeStampOfWalGroup(String walGroup) { - return this.lastTimestamps.get(walGroup) == null ? 0 : lastTimestamps.get(walGroup); - } - /** * get age of last shipped op of given wal group. If the walGroup is null, return 0 * @param walGroup which group we are getting @@ -129,9 +121,9 @@ public class MetricsSource implements BaseSource { * @param walGroupId id of the group to update */ public void refreshAgeOfLastShippedOp(String walGroupId) { - Long lastTimestamp = this.lastTimestamps.get(walGroupId); + Long lastTimestamp = this.lastShippedTimeStamps.get(walGroupId); if (lastTimestamp == null) { - this.lastTimestamps.put(walGroupId, 0L); + this.lastShippedTimeStamps.put(walGroupId, 0L); lastTimestamp = 0L; } if (lastTimestamp > 0) { @@ -198,6 +190,30 @@ public class MetricsSource implements BaseSource { globalSourceSource.incrShippedBytes(sizeInBytes); } + /** + * Gets the number of edits not eligible for replication this source queue logs so far. + * @return logEditsFiltered non-replicable edits filtered from this queue logs. + */ + public long getEditsFiltered(){ + return this.singleSourceSource.getEditsFiltered(); + } + + /** + * Gets the number of edits eligible for replication read from this source queue logs so far. + * @return replicableEdits total number of replicable edits read from this queue logs. + */ + public long getReplicableEdits(){ + return this.singleSourceSource.getLogEditsRead() - this.singleSourceSource.getEditsFiltered(); + } + + /** + * Gets the number of OPs shipped by this source queue to target cluster. + * @return oPsShipped total number of OPs shipped by this source. + */ + public long getOpsShipped() { + return this.singleSourceSource.getShippedOps(); + } + /** * Convience method to apply changes to metrics do to shipping a batch of logs. * @@ -223,8 +239,9 @@ public class MetricsSource implements BaseSource { singleSourceSource.decrSizeOfLogQueue(lastQueueSize); singleSourceSource.clear(); globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize); - lastTimestamps.clear(); + lastShippedTimeStamps.clear(); lastHFileRefsQueueSize = 0; + timeStampNextToReplicate = 0; } /** @@ -260,7 +277,7 @@ public class MetricsSource implements BaseSource { */ public long getTimestampOfLastShippedOp() { long lastTimestamp = 0L; - for (long ts : lastTimestamps.values()) { + for (long ts : lastShippedTimeStamps.values()) { if (ts > lastTimestamp) { lastTimestamp = ts; } @@ -268,6 +285,32 @@ public class MetricsSource implements BaseSource { return lastTimestamp; } + /** + * TimeStamp of next edit to be replicated. + * @return timeStampNextToReplicate - TimeStamp of next edit to be replicated. + */ + public long getTimeStampNextToReplicate() { + return timeStampNextToReplicate; + } + + /** + * TimeStamp of next edit targeted for replication. Used for calculating lag, + * as if this timestamp is greater than timestamp of last shipped, it means there's + * at least one edit pending replication. + * @param timeStampNextToReplicate timestamp of next edit in the queue that should be replicated. + */ + public void setTimeStampNextToReplicate(long timeStampNextToReplicate) { + this.timeStampNextToReplicate = timeStampNextToReplicate; + } + + public long getReplicationDelay() { + if(getTimestampOfLastShippedOp()>=timeStampNextToReplicate){ + return 0; + }else{ + return EnvironmentEdgeManager.currentTime() - timeStampNextToReplicate; + } + } + /** * Get the slave peer ID * @return peerID diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 799d9750ed..9a449ad21c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -274,25 +274,12 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } private void buildReplicationLoad() { - List sourceMetricsList = new ArrayList<>(); - - // get source - List sources = this.replicationManager.getSources(); - for (ReplicationSourceInterface source : sources) { - sourceMetricsList.add(source.getSourceMetrics()); - } - - // get old source - List oldSources = this.replicationManager.getOldSources(); - for (ReplicationSourceInterface source : oldSources) { - if (source instanceof ReplicationSource) { - sourceMetricsList.add(source.getSourceMetrics()); - } - } - + List allSources = new ArrayList<>(); + allSources.addAll(this.replicationManager.getSources()); + allSources.addAll(this.replicationManager.getOldSources()); // get sink MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); - this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); + this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 53e560b0d5..4f2ab68873 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -19,14 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.ArrayList; -import java.util.Map; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Strings; /** @@ -37,11 +34,9 @@ public class ReplicationLoad { // Empty load instance. public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad(); - - private List sourceMetricsList; private MetricsSink sinkMetrics; - private List replicationLoadSourceList; + private List replicationLoadSourceMapEntries; private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink; /** default constructor */ @@ -51,13 +46,12 @@ public class ReplicationLoad { /** * buildReplicationLoad - * @param srMetricsList + * @param sources * @param skMetrics */ - public void buildReplicationLoad(final List srMetricsList, + public void buildReplicationLoad(final List sources, final MetricsSink skMetrics) { - this.sourceMetricsList = srMetricsList; this.sinkMetrics = skMetrics; // build the SinkLoad @@ -67,10 +61,9 @@ public class ReplicationLoad { rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp()); this.replicationLoadSink = rLoadSinkBuild.build(); - // build the SourceLoad List - Map replicationLoadSourceMap = - new HashMap<>(); - for (MetricsSource sm : this.sourceMetricsList) { + this.replicationLoadSourceMapEntries = new ArrayList<>(); + for (ReplicationSourceInterface source : sources) { + MetricsSource sm = source.getSourceMetrics(); // Get the actual peer id String peerId = sm.getPeerID(); String[] parts = peerId.split("-", 2); @@ -78,18 +71,11 @@ public class ReplicationLoad { long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); int sizeOfLogQueue = sm.getSizeOfLogQueue(); + long editsRead = sm.getReplicableEdits(); + long oPsShipped = sm.getOpsShipped(); long timeStampOfLastShippedOp = sm.getTimestampOfLastShippedOp(); - long replicationLag = - calculateReplicationDelay(ageOfLastShippedOp, timeStampOfLastShippedOp, sizeOfLogQueue); - - ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); - if (rLoadSource != null) { - ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp); - sizeOfLogQueue += rLoadSource.getSizeOfLogQueue(); - timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(), - timeStampOfLastShippedOp); - replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag); - } + long timeStampOfNextToReplicate = sm.getTimeStampNextToReplicate(); + long replicationLag = sm.getReplicationDelay(); ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild = ClusterStatusProtos.ReplicationLoadSource.newBuilder(); rLoadSourceBuild.setPeerID(peerId); @@ -97,33 +83,24 @@ public class ReplicationLoad { rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue); rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp); rLoadSourceBuild.setReplicationLag(replicationLag); + rLoadSourceBuild.setTimeStampOfNextToReplicate(timeStampOfNextToReplicate); + rLoadSourceBuild.setEditsRead(editsRead); + rLoadSourceBuild.setOPsShipped(oPsShipped); + if (source instanceof ReplicationSource){ + ReplicationSource replSource = (ReplicationSource)source; + rLoadSourceBuild.setRecovered(replSource.getReplicationQueueInfo().isQueueRecovered()); + rLoadSourceBuild.setQueueId(replSource.getReplicationQueueInfo().getQueueId()); + rLoadSourceBuild.setRunning(replSource.isWorkerRunning()); + rLoadSourceBuild.setEditsSinceRestart(timeStampOfNextToReplicate>0); + } - replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build()); - } - this.replicationLoadSourceList = new ArrayList<>(replicationLoadSourceMap.values()); - } + ClusterStatusProtos.ReplicationLoadSourceMapEntry.Builder rLoadSourceBuildEntry = + ClusterStatusProtos.ReplicationLoadSourceMapEntry.newBuilder(); + rLoadSourceBuildEntry.setKey(peerId); + rLoadSourceBuildEntry.setValue(rLoadSourceBuild.build()); - static long calculateReplicationDelay(long ageOfLastShippedOp, - long timeStampOfLastShippedOp, int sizeOfLogQueue) { - long replicationLag; - long timePassedAfterLastShippedOp; - if (timeStampOfLastShippedOp == 0) { //replication not start yet, set to Long.MAX_VALUE - return Long.MAX_VALUE; - } else { - timePassedAfterLastShippedOp = - EnvironmentEdgeManager.currentTime() - timeStampOfLastShippedOp; - } - if (sizeOfLogQueue > 1) { - // err on the large side - replicationLag = Math.max(ageOfLastShippedOp, timePassedAfterLastShippedOp); - } else if (timePassedAfterLastShippedOp < 2 * ageOfLastShippedOp) { - replicationLag = ageOfLastShippedOp; // last shipped happen recently - } else { - // last shipped may happen last night, - // so NO real lag although ageOfLastShippedOp is non-zero - replicationLag = 0; + this.replicationLoadSourceMapEntries.add(rLoadSourceBuildEntry.build()); } - return replicationLag; } /** @@ -131,19 +108,18 @@ public class ReplicationLoad { * @return a string contains sourceReplicationLoad information */ public String sourceToString() { - if (this.sourceMetricsList == null) return null; - StringBuilder sb = new StringBuilder(); - for (ClusterStatusProtos.ReplicationLoadSource rls : this.replicationLoadSourceList) { + for (ClusterStatusProtos.ReplicationLoadSourceMapEntry rls : + this.replicationLoadSourceMapEntries) { - sb = Strings.appendKeyValue(sb, "\n PeerID", rls.getPeerID()); - sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getAgeOfLastShippedOp()); - sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getSizeOfLogQueue()); + sb = Strings.appendKeyValue(sb, "\n PeerID", rls.getKey()); + sb = Strings.appendKeyValue(sb, "AgeOfLastShippedOp", rls.getValue().getAgeOfLastShippedOp()); + sb = Strings.appendKeyValue(sb, "SizeOfLogQueue", rls.getValue().getSizeOfLogQueue()); sb = Strings.appendKeyValue(sb, "TimestampsOfLastShippedOp", - (new Date(rls.getTimeStampOfLastShippedOp()).toString())); - sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getReplicationLag()); + (new Date(rls.getValue().getTimeStampOfLastShippedOp()).toString())); + sb = Strings.appendKeyValue(sb, "Replication Lag", rls.getValue().getReplicationLag()); } return sb.toString(); @@ -171,8 +147,9 @@ public class ReplicationLoad { return this.replicationLoadSink; } - public List getReplicationLoadSourceList() { - return this.replicationLoadSourceList; + public List + getReplicationLoadSourceMapEntries() { + return this.replicationLoadSourceMapEntries; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 10fa50f553..bad14d900c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -179,7 +179,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; - LOG.info("queueId={}, ReplicationSource : {}, currentBandwidth={}", queueId, + LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } @@ -209,7 +209,8 @@ public class ReplicationSource implements ReplicationSourceInterface { } else { queue.put(log); } - + LOG.trace("Added log file {} to queue of source {}.", logPrefix, + this.replicationQueueInfo.getQueueId()); this.metrics.incrSizeOfLogQueue(); // This will log a warning for each new log that gets created above the warn threshold int queueSize = queue.size(); @@ -319,15 +320,13 @@ public class ReplicationSource implements ReplicationSourceInterface { @Override public Map getWalGroupStatus() { Map sourceReplicationStatus = new TreeMap<>(); - long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize; + long ageOfLastShippedOp, replicationDelay, fileSize; for (Map.Entry walGroupShipper : workerThreads.entrySet()) { String walGroupId = walGroupShipper.getKey(); ReplicationSourceShipper shipper = walGroupShipper.getValue(); - lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId); ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId); int queueSize = queues.get(walGroupId).size(); - replicationDelay = - ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); + replicationDelay = metrics.getReplicationDelay(); Path currentPath = shipper.getCurrentPath(); try { fileSize = getFileSize(currentPath); @@ -476,6 +475,8 @@ public class ReplicationSource implements ReplicationSourceInterface { for (;;) { peerClusterId = replicationEndpoint.getPeerUUID(); if (this.isSourceActive() && peerClusterId == null) { + LOG.debug("Could not connect to Peer ZK. Sleeping for " + + (this.sleepForRetries * sleepMultiplier) + " millis."); if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { sleepMultiplier++; } @@ -493,7 +494,8 @@ public class ReplicationSource implements ReplicationSourceInterface { this.manager.removeSource(this); return; } - LOG.info("Replicating " + clusterId + " -> " + peerClusterId); + LOG.info("Source: {}, is now replicating from cluster: {}; to peer cluster: {};", + this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); initializeWALEntryFilter(peerClusterId); // start workers @@ -608,6 +610,10 @@ public class ReplicationSource implements ReplicationSourceInterface { return !this.server.isStopped() && this.sourceRunning; } + public UUID getPeerClusterUUID(){ + return this.clusterId; + } + /** * Comparator used to compare logs together based on their start time */ @@ -633,6 +639,19 @@ public class ReplicationSource implements ReplicationSourceInterface { } } + public ReplicationQueueInfo getReplicationQueueInfo() { + return replicationQueueInfo; + } + + public boolean isWorkerRunning(){ + for(ReplicationSourceShipper worker : this.workerThreads.values()){ + if(worker.isActive()){ + return worker.isActive(); + } + } + return false; + } + @Override public String getStats() { StringBuilder sb = new StringBuilder(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 20c1215950..12a903ac11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -381,6 +381,7 @@ public class ReplicationSourceManager implements ReplicationListener { abortAndThrowIOExceptionWhenFail( () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName())); src.enqueueLog(walPath); + LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); } } } @@ -789,6 +790,8 @@ public class ReplicationSourceManager implements ReplicationListener { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); + LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", + newLog, source.getQueueId()); } } @@ -960,7 +963,9 @@ public class ReplicationSourceManager implements ReplicationListener { wals.add(wal); } oldsources.add(src); + LOG.trace("Added source for recovered queue: " + src.getQueueId()); for (String wal : walsSet) { + LOG.trace("Enqueueing log from recovered queue for source: " + src.getQueueId()); src.enqueueLog(new Path(oldLogDir, wal)); } src.startup(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 5d6198e96e..86e9d52652 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -92,6 +91,7 @@ public class ReplicationSourceShipper extends Thread { @Override public final void run() { setWorkerState(WorkerState.RUNNING); + LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId); // Loop until we close down while (isActive()) { // Sleep until replication is enabled again @@ -103,10 +103,9 @@ public class ReplicationSourceShipper extends Thread { } try { WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout); + LOG.debug("Shipper from source {} got entry batch from reader: {}", + source.getQueueId(), entryBatch); if (entryBatch == null) { - // since there is no logs need to replicate, we refresh the ageOfLastShippedOp - source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), - walGroupId); continue; } // the NO_MORE_DATA instance has no path so do not call shipEdits @@ -163,16 +162,13 @@ public class ReplicationSourceShipper extends Thread { List entries = entryBatch.getWalEntries(); int sleepMultiplier = 0; if (entries.isEmpty()) { - if (updateLogPosition(entryBatch)) { - // if there was nothing to ship and it's not an error - // set "ageOfLastShippedOp" to to indicate that we're current - source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), - walGroupId); - } + updateLogPosition(entryBatch); return; } int currentSize = (int) entryBatch.getHeapSize(); int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); + source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size() - 1) + .getKey().getWriteTime()); while (isActive()) { try { try { @@ -184,7 +180,6 @@ public class ReplicationSourceShipper extends Thread { // directly go back to while() for confirm this continue; } - // create replicateContext here, so the entries can be GC'd upon return from this call // stack ReplicationEndpoint.ReplicateContext replicateContext = @@ -205,7 +200,7 @@ public class ReplicationSourceShipper extends Thread { // Clean up hfile references for (Entry entry : entries) { cleanUpHFileRefs(entry.getEdit()); - + LOG.trace("shipped entry {}: ", entry); TableName tableName = entry.getKey().getTableName(); source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(), tableName.getNameAsString()); @@ -224,7 +219,7 @@ public class ReplicationSourceShipper extends Thread { source.getSourceMetrics().setAgeOfLastShippedOp( entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); if (LOG.isTraceEnabled()) { - LOG.trace("Replicated {} entries or {} operations in {} ms", + LOG.debug("Replicated {} entries or {} operations in {} ms", entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); } break; @@ -309,7 +304,7 @@ public class ReplicationSourceShipper extends Thread { return 0; } - private boolean isActive() { + public boolean isActive() { return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index b3bdb02940..90a1f138ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -140,7 +140,7 @@ class ReplicationSourceWALReader extends Thread { if (batch != null) { // need to propagate the batch even it has no entries since it may carry the last // sequence id information for serial replication. - LOG.trace("Read {} WAL entries eligible for replication", batch.getNbEntries()); + LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); entryBatchQueue.put(batch); sleepMultiplier = 1; } else { // got no entries and didn't advance position in WAL @@ -168,8 +168,11 @@ class ReplicationSourceWALReader extends Thread { protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) { WALEdit edit = entry.getEdit(); if (edit == null || edit.isEmpty()) { + LOG.debug("Edit null or empty for entry {} ", entry); return false; } + LOG.debug("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", + entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry); @@ -285,7 +288,8 @@ class ReplicationSourceWALReader extends Thread { protected final Entry filterEntry(Entry entry) { Entry filtered = filter.filter(entry); - if (entry != null && filtered == null) { + if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { + LOG.debug("Filtered entry for replication: {}", entry); source.getSourceMetrics().incrLogEditsFiltered(); } return filtered; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 0393af4970..54ebf17538 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -173,6 +173,7 @@ class WALEntryStream implements Closeable { private void tryAdvanceEntry() throws IOException { if (checkReader()) { boolean beingWritten = readNextEntryAndRecordReaderPosition(); + LOG.trace("reading wal file {}. Current open for write: {}", this.currentPath, beingWritten); if (currentEntry == null && !beingWritten) { // no more entries in this log file, and the file is already closed, i.e, rolled // Before dequeueing, we should always get one more attempt at reading. @@ -272,6 +273,7 @@ class WALEntryStream implements Closeable { return true; } if (readEntry != null) { + LOG.trace("reading entry: {} ", readEntry); metrics.incrLogEditsRead(); metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index cd84293cae..dd60597e11 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -173,8 +173,7 @@ public class TestReplicationBase { htable1.put(puts); } - @BeforeClass - public static void setUpBeforeClass() throws Exception { + protected static void configureClusters(){ conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger // sufficient number of events. But we don't want to go too low because @@ -196,6 +195,17 @@ public class TestReplicationBase { conf1.setLong("hbase.serial.replication.waiting.ms", 100); utility1 = new HBaseTestingUtility(conf1); + + // Base conf2 on conf1 so it gets the right zk cluster. + conf2 = HBaseConfiguration.create(conf1); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); + conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); + + utility2 = new HBaseTestingUtility(conf2); + } + + protected static void startClusters() throws Exception{ utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); // Have to reget conf1 in case zk cluster location different @@ -205,13 +215,6 @@ public class TestReplicationBase { admin = new ReplicationAdmin(conf1); LOG.info("Setup first Zk"); - // Base conf2 on conf1 so it gets the right zk cluster. - conf2 = HBaseConfiguration.create(conf1); - conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); - conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); - conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); - - utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); zkw2 = new ZKWatcher(conf2, "cluster2", null, true); LOG.info("Setup second Zk"); @@ -246,6 +249,12 @@ public class TestReplicationBase { htable2 = connection2.getTable(tableName); } + @BeforeClass + public static void setUpBeforeClass() throws Exception { + configureClusters(); + startClusters(); + } + private boolean peerExist(String peerId) throws IOException { return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java index aaa843ef3d..d5b1b5a8bc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java @@ -18,21 +18,30 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.EnumSet; import java.util.List; + import org.apache.hadoop.hbase.ClusterMetrics.Option; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseClassTestRule; + import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Put; + import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -60,7 +69,8 @@ public class TestReplicationStatus extends TestReplicationBase { @Test public void testReplicationStatus() throws Exception { LOG.info("testReplicationStatus"); - + utility2.shutdownMiniHBaseCluster(); + utility2.startMiniHBaseCluster(1,4); try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { // disable peer admin.disablePeer(PEER_ID); @@ -103,11 +113,203 @@ public class TestReplicationStatus extends TestReplicationBase { ServerLoad sl = status.getLoad(server); List rLoadSourceList = sl.getReplicationLoadSourceList(); // check SourceList still only has one entry - assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1)); + assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 2)); assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID()); } finally { admin.enablePeer(PEER_ID); utility1.getHBaseCluster().getRegionServer(1).start(); } } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + //we need to perform initialisations from TestReplicationBase.setUpBeforeClass() on each + //test here, so we override BeforeClass to do nothing and call + // TestReplicationBase.setUpBeforeClass() from setup method + TestReplicationBase.configureClusters(); + } + + @Before + @Override + public void setUpBase() throws Exception { + TestReplicationBase.startClusters(); + super.setUpBase(); + } + + @After + @Override + public void tearDownBase() throws Exception { + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass(){ + //We need to override it here to avoid issues when trying to execute super class teardown + } + + @Test + public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception { + utility2.shutdownMiniHBaseCluster(); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + Admin hbaseAdmin = utility1.getConnection().getAdmin(); + ServerName serverName = utility1.getHBaseCluster(). + getRegionServer(0).getServerName(); + ClusterStatus status = new ClusterStatus(hbaseAdmin. + getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + List loadSources = status.getLiveServerMetrics(). + get(serverName).getReplicationLoadSourceList(); + assertEquals(1, loadSources.size()); + ReplicationLoadSource loadSource = loadSources.get(0); + assertFalse(loadSource.hasEditsSinceRestart()); + assertEquals(0, loadSource.getTimestampOfLastShippedOp()); + assertEquals(0, loadSource.getReplicationLag()); + assertFalse(loadSource.isRecovered()); + } + + @Test + public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception { + utility2.shutdownMiniHBaseCluster(); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + Admin hbaseAdmin = utility1.getConnection().getAdmin(); + //add some values to source cluster + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i)); + htable1.put(p); + } + Thread.sleep(10000); + ServerName serverName = utility1.getHBaseCluster(). + getRegionServer(0).getServerName(); + ClusterStatus status = new ClusterStatus(hbaseAdmin. + getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + List loadSources = status.getLiveServerMetrics(). + get(serverName).getReplicationLoadSourceList(); + assertEquals(1, loadSources.size()); + ReplicationLoadSource loadSource = loadSources.get(0); + assertTrue(loadSource.hasEditsSinceRestart()); + assertEquals(0, loadSource.getTimestampOfLastShippedOp()); + assertTrue(loadSource.getReplicationLag()>0); + assertFalse(loadSource.isRecovered()); + } + + @Test + public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception { + utility2.shutdownMiniHBaseCluster(); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + //add some values to cluster 1 + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i)); + htable1.put(p); + } + Thread.sleep(10000); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + Admin hbaseAdmin = utility1.getConnection().getAdmin(); + ServerName serverName = utility1.getHBaseCluster(). + getRegionServer(0).getServerName(); + Thread.sleep(10000); + ClusterStatus status = new ClusterStatus(hbaseAdmin. + getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + List loadSources = status.getLiveServerMetrics(). + get(serverName).getReplicationLoadSourceList(); + assertEquals(2, loadSources.size()); + boolean foundRecovery = false; + boolean foundNormal = false; + for(ReplicationLoadSource loadSource : loadSources){ + if (loadSource.isRecovered()){ + foundRecovery = true; + assertTrue(loadSource.hasEditsSinceRestart()); + assertEquals(0, loadSource.getTimestampOfLastShippedOp()); + assertTrue(loadSource.getReplicationLag()>0); + } else { + foundNormal = true; + assertFalse(loadSource.hasEditsSinceRestart()); + assertEquals(0, loadSource.getTimestampOfLastShippedOp()); + assertEquals(0, loadSource.getReplicationLag()); + } + } + assertTrue("No normal queue found.", foundNormal); + assertTrue("No recovery queue found.", foundRecovery); + } + + @Test + public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception { + utility2.shutdownMiniHBaseCluster(); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + //add some values to cluster 1 + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i)); + htable1.put(p); + } + Thread.sleep(10000); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + Admin hbaseAdmin = utility1.getConnection().getAdmin(); + ServerName serverName = utility1.getHBaseCluster(). + getRegionServer(0).getServerName(); + Thread.sleep(10000); + //add more values to cluster 1, these should cause normal queue to lag + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i)); + htable1.put(p); + } + Thread.sleep(10000); + ClusterStatus status = new ClusterStatus(hbaseAdmin. + getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + List loadSources = status.getLiveServerMetrics(). + get(serverName).getReplicationLoadSourceList(); + assertEquals(2, loadSources.size()); + boolean foundRecovery = false; + boolean foundNormal = false; + for(ReplicationLoadSource loadSource : loadSources){ + if (loadSource.isRecovered()){ + foundRecovery = true; + } else { + foundNormal = true; + } + assertTrue(loadSource.hasEditsSinceRestart()); + assertEquals(0, loadSource.getTimestampOfLastShippedOp()); + assertTrue(loadSource.getReplicationLag()>0); + } + assertTrue("No normal queue found.", foundNormal); + assertTrue("No recovery queue found.", foundRecovery); + } + + @Test + public void testReplicationStatusAfterLagging() throws Exception { + utility2.shutdownMiniHBaseCluster(); + utility1.shutdownMiniHBaseCluster(); + utility1.startMiniHBaseCluster(); + //add some values to cluster 1 + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i)); + htable1.put(p); + } + utility2.startMiniHBaseCluster(); + Thread.sleep(10000); + try(Admin hbaseAdmin = utility1.getConnection().getAdmin()) { + ServerName serverName = utility1.getHBaseCluster().getRegionServer(0). + getServerName(); + ClusterStatus status = + new ClusterStatus(hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + List loadSources = status.getLiveServerMetrics().get(serverName). + getReplicationLoadSourceList(); + assertEquals(1, loadSources.size()); + ReplicationLoadSource loadSource = loadSources.get(0); + assertTrue(loadSource.hasEditsSinceRestart()); + assertTrue(loadSource.getTimestampOfLastShippedOp() > 0); + assertEquals(0, loadSource.getReplicationLag()); + }finally{ + utility2.shutdownMiniHBaseCluster(); + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 898feae8e9..968f03b45b 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -789,24 +789,46 @@ module Hbase puts(format('%d live servers', status.getServersSize)) for server in status.getServers sl = status.getLoad(server) - rSinkString = ' SINK :' + rSinkString = " SINK:" rSourceString = ' SOURCE:' rLoadSink = sl.getReplicationLoadSink next if rLoadSink.nil? rSinkString << ' AgeOfLastAppliedOp=' + rLoadSink.getAgeOfLastAppliedOp.to_s rSinkString << ', TimeStampsOfLastAppliedOp=' + java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp).toString - rLoadSourceList = sl.getReplicationLoadSourceList + rLoadSourceMap = sl.getReplicationLoadSourceMap() index = 0 - while index < rLoadSourceList.size - rLoadSource = rLoadSourceList.get(index) - rSourceString << ' PeerID=' + rLoadSource.getPeerID - rSourceString << ', AgeOfLastShippedOp=' + rLoadSource.getAgeOfLastShippedOp.to_s - rSourceString << ', SizeOfLogQueue=' + rLoadSource.getSizeOfLogQueue.to_s - rSourceString << ', TimeStampsOfLastShippedOp=' + - java.util.Date.new(rLoadSource.getTimeStampOfLastShippedOp).toString - rSourceString << ', Replication Lag=' + rLoadSource.getReplicationLag.to_s - index += 1 + for peer in rLoadSourceMap.keySet() + rSourceString << " PeerID=" + peer + for sourceLoad in rLoadSourceMap.get(peer) + if sourceLoad.isRecovered() + rSourceString << "\n Recovered Queue: " + else + rSourceString << "\n Normal Queue: " + end + rSourceString << sourceLoad.getQueueId() + if (sourceLoad.isRunning()) + if (sourceLoad.getTimeStampOfLastShippedOp() == 0) + rSourceString << "\n No Ops shipped since last restart" + else + rSourceString << "\n AgeOfLastShippedOp=" + sourceLoad.getAgeOfLastShippedOp().to_s + rSourceString << ", TimeStampOfLastShippedOp=" + + (java.util.Date.new(sourceLoad.getTimeStampOfLastShippedOp())).toString() + end + rSourceString << ", SizeOfLogQueue=" + sourceLoad.getSizeOfLogQueue().to_s + rSourceString << ", EditsReadFromLogQueue=" + sourceLoad.getEditsRead().to_s + rSourceString << ", OpsShippedToTarget=" + sourceLoad.getoPsShipped().to_s + if (sourceLoad.hasEditsSinceRestart() ) + rSourceString << ", TimeStampOfNextToReplicate=" + + (java.util.Date.new(sourceLoad.getTimeStampOfNextToReplicate)).toString() + else + rSourceString << ", No edits for this source since it started" + end + rSourceString << ", Replication Lag=" + sourceLoad.getReplicationLag().to_s + else + rSourceString << "\n No Reader/Shipper threads runnning yet." + end + end end puts(format(' %s:', server.getHostname)) if type.casecmp('SOURCE') == 0 -- 2.17.2 (Apple Git-113)