From 09ed33e67aaeb81ede92461505a9174de6113c62 Mon Sep 17 00:00:00 2001 From: Geoffrey Date: Tue, 23 Aug 2016 16:49:24 -0700 Subject: [PATCH] HBASE-16448 - Custom metrics for custom replication endpoints --- .../MetricsReplicationSourceSource.java | 4 +- .../MetricsReplicationGlobalSourceSource.java | 64 +++++++++++++++- .../MetricsReplicationSourceSourceImpl.java | 81 ++++++++++++++++++--- .../replication/regionserver/MetricsSource.java | 85 +++++++++++++++++++++- .../hbase/replication/TestReplicationEndpoint.java | 76 ++++++++++++++++++- 5 files changed, 293 insertions(+), 17 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 8611e15..ea0ae20 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.replication.regionserver; -public interface MetricsReplicationSourceSource { +import org.apache.hadoop.hbase.metrics.BaseSource; + +public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue"; public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index b3e1766..da1bcf4 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -22,7 +22,7 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource { - + private final MetricsReplicationSource rms; private final MutableGaugeLong ageOfLastShippedOpGauge; private final MutableGaugeLong sizeOfLogQueueGauge; private final MutableCounterLong logReadInEditsCounter; @@ -35,7 +35,7 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS private final MutableCounterLong logReadInBytesCounter; public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) { - + this.rms = rms; ageOfLastShippedOpGauge = rms.getMetricsRegistry().getLongGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, 0L); sizeOfLogQueueGauge = rms.getMetricsRegistry().getLongGauge(SOURCE_SIZE_OF_LOG_QUEUE, 0L); @@ -118,4 +118,64 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS public long getLastShippedAge() { return ageOfLastShippedOpGauge.value(); } + + @Override + public void init() { + rms.init(); + } + + @Override + public void setGauge(String gaugeName, long value) { + rms.setGauge(gaugeName, value); + } + + @Override + public void incGauge(String gaugeName, long delta) { + rms.incGauge(gaugeName, delta); + } + + @Override + public void decGauge(String gaugeName, long delta) { + rms.decGauge(gaugeName, delta); + } + + @Override + public void removeMetric(String key) { + rms.removeMetric(key); + } + + @Override + public void incCounters(String counterName, long delta) { + rms.incCounters(counterName, delta); + } + + @Override + public void updateHistogram(String name, long value) { + rms.updateHistogram(name, value); + } + + @Override + public void updateQuantile(String name, long value) { + rms.updateQuantile(name, value); + } + + @Override + public String getMetricsContext() { + return rms.getMetricsContext(); + } + + @Override + public String getMetricsDescription() { + return rms.getMetricsDescription(); + } + + @Override + public String getMetricsJmxContext() { + return rms.getMetricsJmxContext(); + } + + @Override + public String getMetricsName() { + return rms.getMetricsName(); + } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index d6a9128..55c9b05 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -31,6 +31,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String logEditsFilteredKey; private final String shippedBatchesKey; private final String shippedOpsKey; + private String keyPrefix; + @Deprecated private final String shippedKBsKey; private final String shippedBytesKey; @@ -49,32 +51,33 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) { this.rms = rms; this.id = id; + this.keyPrefix = "source." + this.id + "."; - ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp"; + ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp"; ageOfLastShippedOpGauge = rms.getMetricsRegistry().getLongGauge(ageOfLastShippedOpKey, 0L); - sizeOfLogQueueKey = "source." + id + ".sizeOfLogQueue"; + sizeOfLogQueueKey = this.keyPrefix + "sizeOfLogQueue"; sizeOfLogQueueGauge = rms.getMetricsRegistry().getLongGauge(sizeOfLogQueueKey, 0L); - shippedBatchesKey = "source." + this.id + ".shippedBatches"; + shippedBatchesKey = this.keyPrefix + "shippedBatches"; shippedBatchesCounter = rms.getMetricsRegistry().getLongCounter(shippedBatchesKey, 0L); - shippedOpsKey = "source." + this.id + ".shippedOps"; + shippedOpsKey = this.keyPrefix + "shippedOps"; shippedOpsCounter = rms.getMetricsRegistry().getLongCounter(shippedOpsKey, 0L); - shippedKBsKey = "source." + this.id + ".shippedKBs"; + shippedKBsKey = this.keyPrefix + "shippedKBs"; shippedKBsCounter = rms.getMetricsRegistry().getLongCounter(shippedKBsKey, 0L); - shippedBytesKey = "source." + this.id + ".shippedBytes"; + shippedBytesKey = this.keyPrefix + "shippedBytes"; shippedBytesCounter = rms.getMetricsRegistry().getLongCounter(shippedBytesKey, 0L); - logReadInBytesKey = "source." + this.id + ".logReadInBytes"; + logReadInBytesKey = this.keyPrefix + "logReadInBytes"; logReadInBytesCounter = rms.getMetricsRegistry().getLongCounter(logReadInBytesKey, 0L); - logReadInEditsKey = "source." + id + ".logEditsRead"; + logReadInEditsKey = this.keyPrefix + "logEditsRead"; logReadInEditsCounter = rms.getMetricsRegistry().getLongCounter(logReadInEditsKey, 0L); - logEditsFilteredKey = "source." + id + ".logEditsFiltered"; + logEditsFilteredKey = this.keyPrefix + "logEditsFiltered"; logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey, 0L); } @@ -139,4 +142,64 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou public long getLastShippedAge() { return ageOfLastShippedOpGauge.value(); } + + @Override + public void init() { + rms.init(); + } + + @Override + public void setGauge(String gaugeName, long value) { + rms.setGauge(this.keyPrefix + gaugeName, value); + } + + @Override + public void incGauge(String gaugeName, long delta) { + rms.incGauge(this.keyPrefix + gaugeName, delta); + } + + @Override + public void decGauge(String gaugeName, long delta) { + rms.decGauge(this.keyPrefix + gaugeName, delta); + } + + @Override + public void removeMetric(String key) { + rms.removeMetric(this.keyPrefix + key); + } + + @Override + public void incCounters(String counterName, long delta) { + rms.incCounters(this.keyPrefix + counterName, delta); + } + + @Override + public void updateHistogram(String name, long value) { + rms.updateHistogram(this.keyPrefix + name, value); + } + + @Override + public void updateQuantile(String name, long value) { + rms.updateQuantile(this.keyPrefix + name, value); + } + + @Override + public String getMetricsContext() { + return rms.getMetricsContext(); + } + + @Override + public String getMetricsDescription() { + return rms.getMetricsDescription(); + } + + @Override + public String getMetricsJmxContext() { + return rms.getMetricsJmxContext(); + } + + @Override + public String getMetricsName() { + return rms.getMetricsName(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 113bc5b..5824d83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.metrics.BaseSource; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** @@ -30,7 +31,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; * through the metrics interfaces. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) -public class MetricsSource { +public class MetricsSource implements BaseSource { public static final Log LOG = LogFactory.getLog(MetricsSource.class); @@ -74,6 +75,19 @@ public class MetricsSource { } /** + * Constructor for injecting custom (or test) MetricsReplicationSourceSources + * @param id Name of the source this class is monitoring + * @param singleSourceSource Class to monitor id-scoped metrics + * @param globalSourceSource Class to monitor global-scoped metrics + */ + public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource, + MetricsReplicationSourceSource globalSourceSource) { + this.id = id; + this.singleSourceSource = singleSourceSource; + this.globalSourceSource = globalSourceSource; + } + + /** * Set the age of the last edit that was shipped * * @param timestamp write time of the edit @@ -196,4 +210,73 @@ public class MetricsSource { public String getPeerID() { return id; } + + @Override + public void init() { + singleSourceSource.init(); + globalSourceSource.init(); + } + + @Override + public void setGauge(String gaugeName, long value) { + singleSourceSource.setGauge(gaugeName, value); + globalSourceSource.setGauge(gaugeName, value); + } + + @Override + public void incGauge(String gaugeName, long delta) { + singleSourceSource.incGauge(gaugeName, delta); + globalSourceSource.incGauge(gaugeName, delta); + } + + @Override + public void decGauge(String gaugeName, long delta) { + singleSourceSource.decGauge(gaugeName, delta); + globalSourceSource.decGauge(gaugeName, delta); + } + + @Override + public void removeMetric(String key) { + singleSourceSource.removeMetric(key); + globalSourceSource.removeMetric(key); + } + + @Override + public void incCounters(String counterName, long delta) { + singleSourceSource.incCounters(counterName, delta); + globalSourceSource.incCounters(counterName, delta); + } + + @Override + public void updateHistogram(String name, long value) { + singleSourceSource.updateHistogram(name, value); + globalSourceSource.updateHistogram(name, value); + } + + @Override + public void updateQuantile(String name, long value) { + singleSourceSource.updateQuantile(name, value); + globalSourceSource.updateQuantile(name, value); + } + + @Override + public String getMetricsContext() { + return globalSourceSource.getMetricsContext(); + } + + @Override + public String getMetricsDescription() { + return globalSourceSource.getMetricsDescription(); + } + + @Override + public String getMetricsJmxContext() { + return globalSourceSource.getMetricsJmxContext(); + } + + @Override + public String getMetricsName() { + return globalSourceSource.getMetricsName(); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index ae4c8e3..7d7df5c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -37,6 +37,9 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.replication.regionserver.*; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -44,6 +47,10 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * Tests ReplicationSource and ReplicationEndpoint interactions */ @@ -82,8 +89,8 @@ public class TestReplicationEndpoint extends TestReplicationBase { public void testCustomReplicationEndpoint() throws Exception { // test installing a custom replication endpoint other than the default one. admin.addPeer("testCustomReplicationEndpoint", - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) - .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); // check whether the class has been constructed and started Waiter.waitFor(conf1, 60000, new Waiter.Predicate() { @@ -123,8 +130,8 @@ public class TestReplicationEndpoint extends TestReplicationBase { Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get()); final String id = "testReplicationEndpointReturnsFalseOnReplicate"; admin.addPeer(id, - new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) - .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) + .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); // now replicate some data. doPut(row); @@ -141,6 +148,66 @@ public class TestReplicationEndpoint extends TestReplicationBase { admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate"); } + @Test + public void testMetricsSourceBaseSourcePassthrough(){ + /* + The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl + and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. + Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which + allows for custom JMX metrics. + This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through + the two layers of wrapping to the actual BaseSource. + */ + String id = "id"; + DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class); + MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class); + when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry); + MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class); + when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry); + + MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id); + MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms); + MetricsSource source = new MetricsSource(id, singleSourceSource, globalSourceSource); + String gaugeName = "gauge"; + String singleGaugeName = "source.id." + gaugeName; + long delta = 1; + String counterName = "counter"; + String singleCounterName = "source.id." + counterName; + long count = 2; + source.decGauge(gaugeName, delta); + source.getMetricsContext(); + source.getMetricsDescription(); + source.getMetricsJmxContext(); + source.getMetricsName(); + source.incCounters(counterName, count); + source.incGauge(gaugeName, delta); + source.init(); + source.removeMetric(gaugeName); + source.setGauge(gaugeName, delta); + source.updateHistogram(counterName, count); + source.updateQuantile(counterName, count); + + verify(singleRms).decGauge(singleGaugeName, delta); + verify(globalRms).decGauge(gaugeName, delta); + verify(globalRms).getMetricsContext(); + verify(globalRms).getMetricsJmxContext(); + verify(globalRms).getMetricsName(); + verify(singleRms).incCounters(singleCounterName, count); + verify(globalRms).incCounters(counterName, count); + verify(singleRms).incGauge(singleGaugeName, delta); + verify(globalRms).incGauge(gaugeName, delta); + verify(globalRms).init(); + verify(singleRms).removeMetric(singleGaugeName); + verify(globalRms).removeMetric(gaugeName); + verify(singleRms).setGauge(singleGaugeName, delta); + verify(globalRms).setGauge(gaugeName, delta); + verify(singleRms).updateHistogram(singleCounterName, count); + verify(globalRms).updateHistogram(counterName, count); + verify(singleRms).updateQuantile(singleCounterName, count); + verify(globalRms).updateQuantile(counterName, count); + + } + @Test (timeout=120000) public void testWALEntryFilterFromReplicationEndpoint() throws Exception { admin.addPeer("testWALEntryFilterFromReplicationEndpoint", @@ -253,6 +320,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { return true; } + @Override public WALEntryFilter getWALEntryfilter() { return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() { -- 2.9.0