From d001b983be6094684975538665ba6c469811f1b9 Mon Sep 17 00:00:00 2001 From: Xu Cang Date: Wed, 13 Jun 2018 00:06:05 -0700 Subject: [PATCH] HBASE-20695 Add table level metrics for regionserver --- .../replication/regionserver/MetricsSource.java | 27 ++++++++++++++++++++-- .../regionserver/ReplicationSourceShipper.java | 9 ++++++-- .../hbase/replication/TestReplicationEndpoint.java | 21 ++++++++++++++++- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index a59dd72..c7e6421 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.metrics.BaseSource; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + /** * This class is for maintaining the various replication statistics for a source and publishing them * through the metrics interfaces. @@ -45,7 +47,7 @@ public class MetricsSource implements BaseSource { private final MetricsReplicationSourceSource singleSourceSource; private final MetricsReplicationSourceSource globalSourceSource; - + private Map singleSourceSourceByTable; /** * Constructor used to register the metrics @@ -58,6 +60,7 @@ public class MetricsSource implements BaseSource { CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) .getSource(id); globalSourceSource = CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class).getGlobalSource(); + singleSourceSourceByTable = new HashMap<>(); } /** @@ -67,10 +70,12 @@ public class MetricsSource implements BaseSource { * @param globalSourceSource Class to monitor global-scoped metrics */ public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, - MetricsReplicationSourceSource globalSourceSource) { + MetricsReplicationSourceSource globalSourceSource, + Map singleSourceSourceByTable) { this.id = id; this.singleSourceSource = singleSourceSource; this.globalSourceSource = globalSourceSource; + this.singleSourceSourceByTable = singleSourceSourceByTable; } /** @@ -86,6 +91,19 @@ public class MetricsSource implements BaseSource { } /** + * Set the age of the last edit that was shipped group by table + * @param timestamp write time of the edit + * @param tableName String as group and tableName + */ + public void setAgeOfLastShippedOpByTable(long timestamp, String tableName) { + this.getSingleSourceSourceByTable().computeIfAbsent( + tableName, t -> CompatibilitySingletonFactory + .getInstance(MetricsReplicationSourceFactory.class).getSource(t)); + long age = EnvironmentEdgeManager.currentTime() - timestamp; + singleSourceSourceByTable.get(tableName).setLastShippedAge(age); + } + + /** * Convenience method to use the last given timestamp to refresh the age of the last edit. Used * when replication fails and need to keep that metric accurate. * @param walGroupId id of the group to update @@ -349,4 +367,9 @@ public class MetricsSource implements BaseSource { public String getMetricsName() { return globalSourceSource.getMetricsName(); } + + @VisibleForTesting + public Map getSingleSourceSourceByTable() { + return singleSourceSourceByTable; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 123ecbe..51df46a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; @@ -184,6 +185,10 @@ public class ReplicationSourceShipper extends Thread { // Clean up hfile references for (Entry entry : entries) { cleanUpHFileRefs(entry.getEdit()); + + TableName tableName = entry.getKey().getTableName(); + source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(), + tableName.getNameAsString()); } // Log and clean up WAL logs updateLogPosition(entryBatch); @@ -199,8 +204,8 @@ public class ReplicationSourceShipper extends Thread { source.getSourceMetrics().setAgeOfLastShippedOp( entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); if (LOG.isTraceEnabled()) { - LOG.trace("Replicated " + entries.size() + " entries or " + entryBatch.getNbOperations() - + " operations in " + ((endTimeNs - startTimeNs) / 1000000) + " ms"); + LOG.trace("Replicated {} entries or {} operations in {} ms", + entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000); } break; } catch (Exception ex) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index a3c20d6..c62d1cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -23,7 +23,9 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; @@ -316,7 +318,9 @@ public class TestReplicationEndpoint extends TestReplicationBase { MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms); - MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource); + Map singleSourceSourceByTable = new HashMap<>(); + MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource, + singleSourceSourceByTable); String gaugeName = "gauge"; String singleGaugeName = "source.id." + gaugeName; String globalGaugeName = "source." + gaugeName; @@ -353,6 +357,21 @@ public class TestReplicationEndpoint extends TestReplicationBase { verify(globalRms).setGauge(globalGaugeName, delta); verify(singleRms).updateHistogram(singleCounterName, count); verify(globalRms).updateHistogram(globalCounterName, count); + + //check singleSourceSourceByTable metrics. + // singleSourceSourceByTable map entry will be created only + // after calling #setAgeOfLastShippedOpByTable + boolean containsRandomNewTable = source.getSingleSourceSourceByTable() + .containsKey("RandomNewTable"); + Assert.assertEquals(false, containsRandomNewTable); + source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable"); + containsRandomNewTable = source.getSingleSourceSourceByTable() + .containsKey("RandomNewTable"); + Assert.assertEquals(true, containsRandomNewTable); + MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable() + .get("RandomNewTable"); + Assert.assertNotEquals(123L, msr.getLastShippedAge()); + } private void doPut(byte[] row) throws IOException { -- 2.7.4