From 0fa340337537acfbdf5dd1732667e76251741419 Mon Sep 17 00:00:00 2001 From: Peter Somogyi Date: Sun, 19 Nov 2017 14:42:39 +0100 Subject: [PATCH] HBASE-18936 Backport HBASE-16870 to branch-1.3 (Add the metrics of replication sources which were transformed from other dead rs to ReplicationLoad) --- .../replication/regionserver/MetricsSource.java | 2 +- .../replication/regionserver/Replication.java | 13 ++- .../replication/regionserver/ReplicationLoad.java | 26 +++++- .../hbase/replication/TestReplicationStatus.java | 100 +++++++++++++++++++++ 4 files changed, 134 insertions(+), 7 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 0ed1c2e732..9b99f2abc0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -67,7 +67,7 @@ public class MetricsSource { public void setAgeOfLastShippedOp(long timestamp, String walGroup) { long age = EnvironmentEdgeManager.currentTime() - timestamp; singleSourceSource.setLastShippedAge(age); - globalSourceSource.setLastShippedAge(age); + globalSourceSource.setLastShippedAge(Math.max(age, globalSourceSource.getLastShippedAge())); this.lastTimeStamps.put(walGroup, timestamp); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 73e90c3786..be6f7a79fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -434,13 +434,22 @@ public class Replication extends WALActionsListener.Base implements } private void buildReplicationLoad() { - // get source - List sources = this.replicationManager.getSources(); List sourceMetricsList = new ArrayList(); + // get source + List sources = this.replicationManager.getSources(); for (ReplicationSourceInterface source : sources) { sourceMetricsList.add(source.getSourceMetrics()); } + + // get old source + List oldSources = this.replicationManager.getOldSources(); + for (ReplicationSourceInterface source : oldSources) { + if (source instanceof ReplicationSource) { + sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics()); + } + } + // get sink MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics(); this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java index 8dd42bc786..2ead3df99e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.ArrayList; +import java.util.Map; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos; @@ -66,8 +68,14 @@ public class ReplicationLoad { this.replicationLoadSink = rLoadSinkBuild.build(); // build the SourceLoad List - this.replicationLoadSourceList = new ArrayList(); + Map replicationLoadSourceMap = + new HashMap(); for (MetricsSource sm : this.sourceMetricsList) { + // Get the actual peer id + String peerId = sm.getPeerID(); + String[] parts = peerId.split("-", 2); + peerId = parts.length != 1 ? parts[0] : peerId; + long ageOfLastShippedOp = sm.getAgeOfLastShippedOp(); int sizeOfLogQueue = sm.getSizeOfLogQueue(); long timeStampOfLastShippedOp = sm.getTimeStampOfLastShippedOp(); @@ -85,17 +93,27 @@ public class ReplicationLoad { replicationLag = 0; } + ClusterStatusProtos.ReplicationLoadSource rLoadSource = replicationLoadSourceMap.get(peerId); + if (rLoadSource != null) { + ageOfLastShippedOp = Math.max(rLoadSource.getAgeOfLastShippedOp(), ageOfLastShippedOp); + sizeOfLogQueue += rLoadSource.getSizeOfLogQueue(); + timeStampOfLastShippedOp = Math.min(rLoadSource.getTimeStampOfLastShippedOp(), + timeStampOfLastShippedOp); + replicationLag = Math.max(rLoadSource.getReplicationLag(), replicationLag); + } + ClusterStatusProtos.ReplicationLoadSource.Builder rLoadSourceBuild = ClusterStatusProtos.ReplicationLoadSource.newBuilder(); - rLoadSourceBuild.setPeerID(sm.getPeerID()); + rLoadSourceBuild.setPeerID(peerId); rLoadSourceBuild.setAgeOfLastShippedOp(ageOfLastShippedOp); rLoadSourceBuild.setSizeOfLogQueue(sizeOfLogQueue); rLoadSourceBuild.setTimeStampOfLastShippedOp(timeStampOfLastShippedOp); rLoadSourceBuild.setReplicationLag(replicationLag); - this.replicationLoadSourceList.add(rLoadSourceBuild.build()); + replicationLoadSourceMap.put(peerId, rLoadSourceBuild.build()); } - + this.replicationLoadSourceList = new ArrayList( + replicationLoadSourceMap.values()); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java new file mode 100644 index 0000000000..5ef6c9fd98 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestReplicationStatus extends TestReplicationBase { + private static final Log LOG = LogFactory.getLog(TestReplicationStatus.class); + private static final String PEER_ID = "2"; + + /** + * Test for HBASE-9531 + * put a few rows into htable1, which should be replicated to htable2 + * create a ClusterStatus instance 'status' from HBaseAdmin + * test : status.getLoad(server).getReplicationLoadSourceList() + * test : status.getLoad(server).getReplicationLoadSink() + * * @throws Exception + */ + @Test(timeout = 300000) + public void testReplicationStatus() throws Exception { + LOG.info("testReplicationStatus"); + + try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) { + // disable peer + admin.disablePeer(PEER_ID); + + final byte[] qualName = Bytes.toBytes("q"); + Put p; + + for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { + p = new Put(Bytes.toBytes("row" + i)); + p.add(famName, qualName, Bytes.toBytes("val" + i)); + htable1.put(p); + } + + ClusterStatus status = hbaseAdmin.getClusterStatus(); + + for (ServerName server : status.getServers()) { + ServerLoad sl = status.getLoad(server); + List rLoadSourceList = sl.getReplicationLoadSourceList(); + ReplicationLoadSink rLoadSink = sl.getReplicationLoadSink(); + + // check SourceList only has one entry, beacuse only has one peer + assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1)); + assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID()); + + // check Sink exist only as it is difficult to verify the value on the fly + assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ", + (rLoadSink.getAgeOfLastAppliedOp() >= 0)); + assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ", + (rLoadSink.getTimeStampsOfLastAppliedOp() >= 0)); + } + + // Stop rs1, then the queue of rs1 will be transfered to rs0 + utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer"); + Thread.sleep(10000); + status = hbaseAdmin.getClusterStatus(); + ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName(); + ServerLoad sl = status.getLoad(server); + List rLoadSourceList = sl.getReplicationLoadSourceList(); + // check SourceList still only has one entry + assertTrue("failed to get ReplicationLoadSourceList", (rLoadSourceList.size() == 1)); + assertEquals(PEER_ID, rLoadSourceList.get(0).getPeerID()); + } finally { + admin.enablePeer(PEER_ID); + utility1.getHBaseCluster().getRegionServer(1).start(); + } + } +} -- 2.15.0