From 8d8f509721a37d1ba0d4efc2c9da981af501d92d Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Mon, 10 Dec 2018 17:17:54 +0000 Subject: [PATCH] HBASE-21406 - Added new metric to differentiate sink startup time from last OP applied time. Original behaviour was to always set startup time to TimestampsOfLastAppliedOp, and always show it on "status 'replication'" command, regardless if the sink ever applied any OP. This was confusing, specially for scenarios where cluster was just acting as source, the output could lead to wrong interpretations about sink not applying edits or replication being stuck. With the new metric, we now compare the two metrics values, assuming that if both are the same, there's never been any OP shipped to the given sink, so output would reflect it more clearly, to something as for example: SINK: TimeStampStarted=Thu Dec 06 23:59:47 GMT 2018, Waiting for OPs... --- .../replication/ReplicationLoadSink.java | 15 +++++- .../hbase/shaded/protobuf/ProtobufUtil.java | 7 ++- .../MetricsReplicationSinkSource.java | 1 + .../MetricsReplicationSinkSourceImpl.java | 2 + .../src/main/protobuf/ClusterStatus.proto | 2 + .../src/main/protobuf/ClusterStatus.proto | 2 + .../replication/regionserver/MetricsSink.java | 18 +++++++ .../regionserver/ReplicationLoad.java | 2 + .../replication/TestReplicationStatus.java | 49 ++++++++++++++++--- hbase-shell/src/main/ruby/hbase/admin.rb | 12 +++-- 10 files changed, 97 insertions(+), 13 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java index d9698ca7d6..c797482875 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java @@ -19,12 +19,17 @@ import org.apache.yetus.audience.InterfaceAudience; public class ReplicationLoadSink { private final long ageOfLastAppliedOp; private final long timestampsOfLastAppliedOp; + private final long timeStampStarted; + private final long totalOpsProcessed; // TODO: add the builder for this class @InterfaceAudience.Private - public ReplicationLoadSink(long age, long timestamp) { + public ReplicationLoadSink(long age, long timestamp, long timeStampStarted, + long totalOpsProcessed) { this.ageOfLastAppliedOp = age; this.timestampsOfLastAppliedOp = timestamp; + this.timeStampStarted = timeStampStarted; + this.totalOpsProcessed = totalOpsProcessed; } public long getAgeOfLastAppliedOp() { @@ -43,4 +48,12 @@ public class ReplicationLoadSink { public long getTimestampsOfLastAppliedOp() { return this.timestampsOfLastAppliedOp; } + + public long getTimeStampStarted() { + return timeStampStarted; + } + + public long getTotalOpsProcessed() { + return totalOpsProcessed; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index cf4c831808..e1f732b519 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2713,7 +2713,10 @@ public final class ProtobufUtil { public static ReplicationLoadSink toReplicationLoadSink( ClusterStatusProtos.ReplicationLoadSink rls) { - return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(), rls.getTimeStampsOfLastAppliedOp()); + return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(), + rls.getTimeStampsOfLastAppliedOp(), + rls.getTimeStampStarted(), + rls.getTotalOpsProcessed()); } public static ReplicationLoadSource toReplicationLoadSource( @@ -3236,6 +3239,8 @@ public final class ProtobufUtil { return ClusterStatusProtos.ReplicationLoadSink.newBuilder() .setAgeOfLastAppliedOp(rls.getAgeOfLastAppliedOp()) .setTimeStampsOfLastAppliedOp(rls.getTimestampsOfLastAppliedOp()) + .setTimeStampStarted(rls.getTimeStampStarted()) + .setTotalOpsProcessed(rls.getTotalOpsProcessed()) .build(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java index 1d6251b993..2498e3426a 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java @@ -32,4 +32,5 @@ public interface MetricsReplicationSinkSource { void incrAppliedOps(long batchsize); long getLastAppliedOpAge(); void incrAppliedHFiles(long hfileSize); + long getSinkAppliedOps(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java index 485764e7c8..68194dfa61 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java @@ -58,4 +58,6 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS public void incrAppliedHFiles(long hfiles) { hfilesCounter.incr(hfiles); } + + @Override public long getSinkAppliedOps() { return opsCounter.value(); } } diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto index d39db362ea..69fc7fc754 100644 --- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto @@ -153,6 +153,8 @@ message RegionLoad { message ReplicationLoadSink { required uint64 ageOfLastAppliedOp = 1; required uint64 timeStampsOfLastAppliedOp = 2; + required uint64 timeStampStarted = 3; + required uint64 totalOpsProcessed = 4; } message ReplicationLoadSource { diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto b/hbase-protocol/src/main/protobuf/ClusterStatus.proto index 009d172c36..35bd314141 100644 --- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto @@ -149,6 +149,8 @@ message RegionLoad { message ReplicationLoadSink { required uint64 ageOfLastAppliedOp = 1; required uint64 timeStampsOfLastAppliedOp = 2; + required uint64 timeStampStarted = 3; + required uint64 totalOpsProcessed = 4; } message ReplicationLoadSource { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java index eafb54ccb1..6531db64a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory; public class MetricsSink { private long lastTimestampForAge = System.currentTimeMillis(); + private long startTimestamp = System.currentTimeMillis(); private final MetricsReplicationSinkSource mss; public MetricsSink() { @@ -110,4 +111,21 @@ public class MetricsSink { public long getTimestampOfLastAppliedOp() { return this.lastTimestampForAge; } + + /** + * Gets the time stamp from when the Sink was initialized. + * @return startTimestamp + */ + public long getStartTimestamp() { + return startTimestamp; + } + + /** + * Gets the total number of OPs delivered to target by this sink. + * @return totalAplliedOps + */ + public long getAppliedOps() { + return this.mss.getSinkAppliedOps(); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 53e560b0d5..4771498e3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -65,6 +65,8 @@ public class ReplicationLoad { ClusterStatusProtos.ReplicationLoadSink.newBuilder(); rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp()); rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp()); + rLoadSinkBuild.setTimeStampStarted(sinkMetrics.getStartTimestamp()); + rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps()); this.replicationLoadSink = rLoadSinkBuild.build(); // build the SourceLoad List diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java index aaa843ef3d..c3970dc7d8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.EnumSet; import java.util.List; import org.apache.hadoop.hbase.ClusterMetrics.Option; @@ -49,6 +50,15 @@ public class TestReplicationStatus extends TestReplicationBase { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStatus.class); private static final String PEER_ID = "2"; + private void insertRowsOnSource() throws IOException { + final byte[] qualName = Bytes.toBytes("q"); + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); + htable1.put(p); + } + } + /** * Test for HBASE-9531 * put a few rows into htable1, which should be replicated to htable2 @@ -65,14 +75,7 @@ public class TestReplicationStatus extends TestReplicationBase { // disable peer admin.disablePeer(PEER_ID); - final byte[] qualName = Bytes.toBytes("q"); - Put p; - - for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - p = new Put(Bytes.toBytes("row" + i)); - p.addColumn(famName, qualName, Bytes.toBytes("val" + i)); - htable1.put(p); - } + insertRowsOnSource(); ClusterStatus status = new ClusterStatus(hbaseAdmin.getClusterMetrics( EnumSet.of(Option.LIVE_SERVERS))); @@ -110,4 +113,34 @@ public class TestReplicationStatus extends TestReplicationBase { utility1.getHBaseCluster().getRegionServer(1).start(); } } + + @Test + public void testReplicationStatusSink() throws Exception { + try (Admin hbaseAdmin = utility2.getConnection().getAdmin()) { + ClusterStatus status = new ClusterStatus(hbaseAdmin + .getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + for(int i=1; i<4; i++) { + utility2.getHBaseCluster().getRegionServer(i).stop("Stopping additional RS"); + } + Thread.sleep(1000); + ServerName server = utility2.getHBaseCluster().getRegionServer(0).getServerName(); + ServerLoad sl = status.getLoad(server); + ReplicationLoadSink loadSink = sl.getReplicationLoadSink(); + //First checks if status if timestamp of last applied op is same as RS start, since no edits + //were replicated yet + assertEquals(loadSink.getTimeStampStarted(), loadSink.getTimestampsOfLastAppliedOp()); + //now insert some rows on source, so that it gets delivered to target + insertRowsOnSource(); + Thread.sleep(1000); + status = new ClusterStatus(hbaseAdmin + .getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))); + sl = status.getLoad(server); + loadSink = sl.getReplicationLoadSink(); + assertTrue(loadSink.getTimestampsOfLastAppliedOp()>loadSink.getTimeStampStarted()); + } finally { + for(int i=1; i<4; i++) { + utility2.getHBaseCluster().getRegionServer(i).start(); + } + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 898feae8e9..d47df93fae 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -793,9 +793,15 @@ module Hbase rSourceString = ' SOURCE:' rLoadSink = sl.getReplicationLoadSink next if rLoadSink.nil? - rSinkString << ' AgeOfLastAppliedOp=' + rLoadSink.getAgeOfLastAppliedOp.to_s - rSinkString << ', TimeStampsOfLastAppliedOp=' + - java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp).toString + if (rLoadSink.getTimeStampsOfLastAppliedOp() == rLoadSink.getTimeStampStarted()) + rSinkString << " TimeStampStarted=" + rLoadSink.getTimeStampStarted().to_s + rSinkString << ", Waiting for OPs... " + else + rSinkString << " TimeStampStarted=" + rLoadSink.getTimeStampStarted().to_s + rSinkString << ", AgeOfLastAppliedOp=" + rLoadSink.getAgeOfLastAppliedOp().to_s + rSinkString << ", TimeStampsOfLastAppliedOp=" + + (java.util.Date.new(rLoadSink.getTimeStampsOfLastAppliedOp())).toString() + end rLoadSourceList = sl.getReplicationLoadSourceList index = 0 while index < rLoadSourceList.size -- 2.17.2 (Apple Git-113)