diff --git conf/hadoop-metrics2.properties conf/hadoop-metrics2.properties new file mode 100644 index 0000000..451ec33 --- /dev/null +++ conf/hadoop-metrics2.properties @@ -0,0 +1,5 @@ +# syntax: [prefix].[source|sink|jmx].[instance].[options] +# See package.html for org.apache.hadoop.metrics2 for details + +*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink + diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 0b6987e..bb309ff 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationSinkMetrics; import org.apache.hadoop.hbase.util.Bytes; /** @@ -133,7 +134,7 @@ public class ReplicationSink { } this.metrics.setAgeOfLastAppliedOp( entries[entries.length-1].getKey().getWriteTime()); - this.metrics.appliedBatchesRate.inc(1); + this.metrics.applyBatch(entries.length); LOG.info("Total replicated: " + totalReplicated); } catch (IOException ex) { LOG.error("Unable to accept edit because:", ex); @@ -173,7 +174,6 @@ public class ReplicationSink { try { table = this.pool.getTable(tableName); table.batch(rows); - this.metrics.appliedOpsRate.inc(rows.size()); } catch (InterruptedException ix) { throw new IOException(ix); } finally { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java deleted file mode 100644 index bf324e2..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.metrics.MetricsRate; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.Updater; -import org.apache.hadoop.metrics.jvm.JvmMetrics; -import org.apache.hadoop.metrics.util.MetricsIntValue; -import org.apache.hadoop.metrics.util.MetricsLongValue; -import org.apache.hadoop.metrics.util.MetricsRegistry; - -/** - * This class is for maintaining the various replication statistics - * for a sink and publishing them through the metrics interfaces. - */ -@InterfaceAudience.Private -public class ReplicationSinkMetrics implements Updater { - private final MetricsRecord metricsRecord; - private MetricsRegistry registry = new MetricsRegistry(); - - /** Rate of operations applied by the sink */ - public final MetricsRate appliedOpsRate = - new MetricsRate("appliedOpsRate", registry); - - /** Rate of batches (of operations) applied by the sink */ - public final MetricsRate appliedBatchesRate = - new MetricsRate("appliedBatchesRate", registry); - - /** Age of the last operation that was applied by the sink */ - private final MetricsLongValue ageOfLastAppliedOp = - new MetricsLongValue("ageOfLastAppliedOp", registry); - - /** - * Constructor used to register the metrics - */ - public ReplicationSinkMetrics() { - MetricsContext context = MetricsUtil.getContext("hbase"); - String name = Thread.currentThread().getName(); - metricsRecord = MetricsUtil.createRecord(context, "replication"); - metricsRecord.setTag("RegionServer", name); - context.registerUpdater(this); - // export for JMX - new ReplicationStatistics(this.registry, "ReplicationSink"); - } - - /** - * Set the age of the last edit that was applied - * @param timestamp write time of the edit - */ - public void setAgeOfLastAppliedOp(long timestamp) { - ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp); - } - @Override - public void doUpdates(MetricsContext metricsContext) { - synchronized (this) { - this.appliedOpsRate.pushMetric(this.metricsRecord); - this.appliedBatchesRate.pushMetric(this.metricsRecord); - this.ageOfLastAppliedOp.pushMetric(this.metricsRecord); - } - this.metricsRecord.update(); - } -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2c328ea..ddca9d1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationSourceMetrics; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; @@ -238,7 +239,7 @@ public class ReplicationSource extends Thread @Override public void enqueueLog(Path log) { this.queue.put(log); - this.metrics.sizeOfLogQueue.set(queue.size()); + this.metrics.setSizeOfLogQueue(queue.size()); } @Override @@ -246,6 +247,7 @@ public class ReplicationSource extends Thread connectToPeers(); // We were stopped while looping to connect to sinks, just abort if (!this.isActive()) { + metrics.clear(); return; } // delay this until we are in an asynchronous thread @@ -376,6 +378,7 @@ public class ReplicationSource extends Thread } } LOG.debug("Source exiting " + peerId); + metrics.clear(); } /** @@ -393,7 +396,7 @@ public class ReplicationSource extends Thread HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]); while (entry != null) { WALEdit edit = entry.getEdit(); - this.metrics.logEditsReadRate.inc(1); + this.metrics.incrLogEditsRead(); seenEntries++; // Remove all KVs that should not be replicated HLogKey logKey = entry.getKey(); @@ -415,7 +418,7 @@ public class ReplicationSource extends Thread currentNbOperations += countDistinctRowKeys(edit); currentNbEntries++; } else { - this.metrics.logEditsFilteredRate.inc(1); + this.metrics.incrLogEditsFiltered(); } } // Stop if too many entries or too big @@ -455,7 +458,7 @@ public class ReplicationSource extends Thread try { if (this.currentPath == null) { this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS); - this.metrics.sizeOfLogQueue.set(queue.size()); + this.metrics.setSizeOfLogQueue(queue.size()); } } catch (InterruptedException e) { LOG.warn("Interrupted while reading edits", e); @@ -616,9 +619,7 @@ public class ReplicationSource extends Thread this.lastLoggedPosition = this.position; } this.totalReplicatedEdits += currentNbEntries; - this.metrics.shippedBatchesRate.inc(1); - this.metrics.shippedOpsRate.inc( - this.currentNbOperations); + this.metrics.shipBatch(this.currentNbOperations); this.metrics.setAgeOfLastShippedOp( this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime()); LOG.debug("Replicated in total: " + this.totalReplicatedEdits); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java deleted file mode 100644 index 543e15d..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.metrics.MetricsRate; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.Updater; -import org.apache.hadoop.metrics.jvm.JvmMetrics; -import org.apache.hadoop.metrics.util.MetricsIntValue; -import org.apache.hadoop.metrics.util.MetricsLongValue; -import org.apache.hadoop.metrics.util.MetricsRegistry; - -/** - * This class is for maintaining the various replication statistics - * for a source and publishing them through the metrics interfaces. - */ -@InterfaceAudience.Private -public class ReplicationSourceMetrics implements Updater { - private final MetricsRecord metricsRecord; - private MetricsRegistry registry = new MetricsRegistry(); - - /** Rate of shipped operations by the source */ - public final MetricsRate shippedOpsRate = - new MetricsRate("shippedOpsRate", registry); - - /** Rate of shipped batches by the source */ - public final MetricsRate shippedBatchesRate = - new MetricsRate("shippedBatchesRate", registry); - - /** Rate of log entries (can be multiple Puts) read from the logs */ - public final MetricsRate logEditsReadRate = - new MetricsRate("logEditsReadRate", registry); - - /** Rate of log entries filtered by the source */ - public final MetricsRate logEditsFilteredRate = - new MetricsRate("logEditsFilteredRate", registry); - - /** Age of the last operation that was shipped by the source */ - private final MetricsLongValue ageOfLastShippedOp = - new MetricsLongValue("ageOfLastShippedOp", registry); - - /** - * Current size of the queue of logs to replicate, - * excluding the one being processed at the moment - */ - public final MetricsIntValue sizeOfLogQueue = - new MetricsIntValue("sizeOfLogQueue", registry); - - // It's a little dirty to preset the age to now since if we fail - // to replicate the very first time then it will show that age instead - // of nothing (although that might not be good either). - private long lastTimestampForAge = System.currentTimeMillis(); - - /** - * Constructor used to register the metrics - * @param id Name of the source this class is monitoring - */ - public ReplicationSourceMetrics(String id) { - MetricsContext context = MetricsUtil.getContext("hbase"); - String name = Thread.currentThread().getName(); - metricsRecord = MetricsUtil.createRecord(context, "replication"); - metricsRecord.setTag("RegionServer", name); - context.registerUpdater(this); - try { - id = URLEncoder.encode(id, "UTF8"); - } catch (UnsupportedEncodingException e) { - id = "CAN'T ENCODE UTF8"; - } - // export for JMX - new ReplicationStatistics(this.registry, "ReplicationSource for " + id); - } - - /** - * Set the age of the last edit that was shipped - * @param timestamp write time of the edit - */ - public void setAgeOfLastShippedOp(long timestamp) { - lastTimestampForAge = timestamp; - ageOfLastShippedOp.set(System.currentTimeMillis() - lastTimestampForAge); - } - - /** - * Convenience method to use the last given timestamp to refresh the age - * of the last edit. Used when replication fails and need to keep that - * metric accurate. - */ - public void refreshAgeOfLastShippedOp() { - setAgeOfLastShippedOp(lastTimestampForAge); - } - - @Override - public void doUpdates(MetricsContext metricsContext) { - synchronized (this) { - this.shippedOpsRate.pushMetric(this.metricsRecord); - this.shippedBatchesRate.pushMetric(this.metricsRecord); - this.logEditsReadRate.pushMetric(this.metricsRecord); - this.logEditsFilteredRate.pushMetric(this.metricsRecord); - this.ageOfLastShippedOp.pushMetric(this.metricsRecord); - this.sizeOfLogQueue.pushMetric(this.metricsRecord); - } - this.metricsRecord.update(); - } -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java deleted file mode 100644 index ceeff5f..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.metrics.MetricsMBeanBase; -import org.apache.hadoop.metrics.util.MBeanUtil; -import org.apache.hadoop.metrics.util.MetricsRegistry; - -import javax.management.ObjectName; - -/** - * Exports metrics recorded by {@link ReplicationSourceMetrics} as an MBean - * for JMX monitoring. - */ -@InterfaceAudience.Private -public class ReplicationStatistics extends MetricsMBeanBase { - - private final ObjectName mbeanName; - - /** - * Constructor to register the MBean - * @param registry which rehistry to use - * @param name name to get to this bean - */ - public ReplicationStatistics(MetricsRegistry registry, String name) { - super(registry, name); - mbeanName = MBeanUtil.registerMBean("Replication", name, this); - } -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationMetricsSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationMetricsSource.java new file mode 100644 index 0000000..5f5d835 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationMetricsSource.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsBuilder; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong; +import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * This is the class that will export the metrics for ReplicationSource's and ReplicationSink's. + * There should only ever be one of these instantiated at a time. Use the getInstance method to get + * the singleton. + */ +@InterfaceAudience.Private +public class ReplicationMetricsSource implements MetricsSource { + + private static ReplicationMetricsSource INSTANCE = new ReplicationMetricsSource(); + public static final Log LOG = LogFactory.getLog(ReplicationMetricsSource.class); + public static final String METRICS_NAME = "ReplicationMetrics"; + private static final String METRICS_CONTEXT = "replicationmetrics"; + + public static ReplicationMetricsSource getInstance() { + return INSTANCE; + } + + private ConcurrentMap + gauges = new ConcurrentHashMap(); + private ConcurrentMap counters = + new ConcurrentHashMap(); + + protected ReplicationMetricsSource() { + DefaultMetricsSystem.initialize("hbase"); + DefaultMetricsSystem.registerSource(METRICS_NAME, "Metrics about hbase replication", this); + } + + /** + * Set a single gauge to a value. + * + * @param gaugeName gauge name + * @param value the new value of the gauge. + */ + public void setGauge(String gaugeName, long value) { + MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, value); + gaugeInt.set(value); + } + + /** + * Add some amount to a gauge. + * + * @param gaugeName The name of the gauge to increment. + * @param delta The amount to increment the gauge by. + */ + public void incGauge(String gaugeName, long delta) { + MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l); + gaugeInt.incr(delta); + } + + /** + * Decrease the value of a named gauge. + * + * @param gaugeName The name of the gauge. + * @param delta the ammount to subtract from a gauge value. + */ + public void decGauge(String gaugeName, long delta) { + MetricMutableGaugeLong gaugeInt = getLongGauge(gaugeName, 0l); + gaugeInt.decr(delta); + } + + /** + * Increment a named counter by some value. + * + * @param key the name of the counter + * @param delta the ammount to increment + */ + public void incCounters(String key, long delta) { + MetricMutableCounterLong counter = getLongCounter(key, 0l); + counter.incr(delta); + + } + + /** + * Remove a named gauge. + * + * @param key + */ + public void removeGauge(String key) { + gauges.remove(key); + } + + /** + * Remove a named counter. + * + * @param key + */ + public void removeCounter(String key) { + counters.remove(key); + } + + /** + * Method to export all the metrics. + * + * @param metricsBuilder Builder to accept metrics + * @param all push all or only changed? + */ + @Override + public void getMetrics(MetricsBuilder metricsBuilder, boolean all) { + + MetricsRecordBuilder rb = + metricsBuilder.addRecord(METRICS_NAME).setContext(METRICS_CONTEXT); + + for (Map.Entry entry : counters.entrySet()) { + entry.getValue().snapshot(rb, all); + } + for (Map.Entry entry : gauges.entrySet()) { + entry.getValue().snapshot(rb, all); + } + + } + + /** + * Get a MetricMutableGaugeLong from the storage. If it is not there atomically put it. + * + * @param gaugeName name of the gauge to create or get. + * @param potentialStartingValue value of the new counter if we have to create it. + * @return + */ + private MetricMutableGaugeLong getLongGauge(String gaugeName, long potentialStartingValue) { + //Try and get the guage. + MetricMutableGaugeLong gaugeInt = gauges.get(gaugeName); + + //If it's not there then try and put a new one in the storage. + if (gaugeInt == null) { + + //Create the potential new gauge. + MetricMutableGaugeLong newGauge = new MetricMutableGaugeLong(gaugeName, "", + potentialStartingValue); + + // Try and put the gauge in. This is atomic. + gaugeInt = gauges.putIfAbsent(gaugeName, newGauge); + + //If the value we get back is null then the put was successful and we will return that. + //otherwise gaugeInt should contain the thing that was in before the put could be completed. + if (gaugeInt == null) { + gaugeInt = newGauge; + } + } + return gaugeInt; + } + + /** + * Get a MetricMutableCounterLong from the storage. If it is not there atomically put it. + * + * @param counterName Name of the counter to get + * @param potentialStartingValue starting value if we have to create a new counter + * @return + */ + private MetricMutableCounterLong getLongCounter(String counterName, long potentialStartingValue) { + //See getLongGauge for description on how this works. + MetricMutableCounterLong counter = counters.get(counterName); + if (counter == null) { + MetricMutableCounterLong newCounter = + new MetricMutableCounterLong(counterName, "", potentialStartingValue); + counter = counters.putIfAbsent(counterName, newCounter); + if (counter == null) { + counter = newCounter; + } + } + return counter; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSinkMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSinkMetrics.java new file mode 100644 index 0000000..a0fd095 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSinkMetrics.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * This class is for maintaining the various replication statistics for a sink and publishing them + * through the metrics interfaces. + */ +@InterfaceAudience.Private +public class ReplicationSinkMetrics { + + public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp"; + public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches"; + public static final String SINK_APPLIED_OPS = "sink.appliedOps"; + + private ReplicationMetricsSource rms; + + public ReplicationSinkMetrics() { + rms = ReplicationMetricsSource.getInstance(); + } + + /** + * Set the age of the last applied operation + * + * @param timestamp The timestamp of the last operation applied. + */ + public void setAgeOfLastAppliedOp(long timestamp) { + long age = System.currentTimeMillis() - timestamp; + rms.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age); + } + + /** + * Convience method to change metrics when a batch of operations are applied. + * + * @param batchSize + */ + public void applyBatch(long batchSize) { + rms.incCounters(SINK_APPLIED_BATCHES, 1); + rms.incCounters(SINK_APPLIED_OPS, batchSize); + } + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSourceMetrics.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSourceMetrics.java new file mode 100644 index 0000000..e54be7f --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSourceMetrics.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver.metrics; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * This class is for maintaining the various replication statistics for a source and publishing them + * through the metrics interfaces. + */ +@InterfaceAudience.Private +public class ReplicationSourceMetrics { + + public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue"; + public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; + public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead"; + public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered"; + public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches"; + public static final String SOURCE_SHIPPED_OPS = "source.shippedOps"; + + public static final Log LOG = LogFactory.getLog(ReplicationSourceMetrics.class); + private String id; + + private long lastTimestamp = 0; + private int lastQueueSize = 0; + + private String sizeOfLogQueKey; + private String ageOfLastShippedOpKey; + private String logEditsReadKey; + private String logEditsFilteredKey; + private final String shippedBatchesKey; + private final String shippedOpsKey; + + private ReplicationMetricsSource rms; + + /** + * Constructor used to register the metrics + * + * @param id Name of the source this class is monitoring + */ + public ReplicationSourceMetrics(String id) { + this.id = id; + + sizeOfLogQueKey = "source." + id + ".sizeOfLogQueue"; + ageOfLastShippedOpKey = "source." + id + ".ageOfLastShippedOp"; + logEditsReadKey = "source." + id + ".logEditsRead"; + logEditsFilteredKey = "source." + id + ".logEditsFiltered"; + shippedBatchesKey = "source." + this.id + ".shippedBatches"; + shippedOpsKey = "source." + this.id + ".shippedOps"; + rms = ReplicationMetricsSource.getInstance(); + } + + /** + * Set the age of the last edit that was shipped + * + * @param timestamp write time of the edit + */ + public void setAgeOfLastShippedOp(long timestamp) { + long age = System.currentTimeMillis() - timestamp; + rms.setGauge(ageOfLastShippedOpKey, age); + rms.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age); + this.lastTimestamp = timestamp; + } + + /** + * Convenience method to use the last given timestamp to refresh the age of the last edit. Used + * when replication fails and need to keep that metric accurate. + */ + public void refreshAgeOfLastShippedOp() { + if (this.lastTimestamp > 0) { + setAgeOfLastShippedOp(this.lastTimestamp); + } + } + + /** + * Set the size of the log queue + * + * @param size the size. + */ + public void setSizeOfLogQueue(int size) { + rms.setGauge(sizeOfLogQueKey, size); + rms.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize); + lastQueueSize = size; + } + + /** + * Add on the the number of log edits read + * + * @param delta the number of log edits read. + */ + private void incrLogEditsRead(long delta) { + rms.incCounters(logEditsReadKey, delta); + rms.incCounters(SOURCE_LOG_EDITS_READ, delta); + } + + /** Increment the number of log edits read by one. */ + public void incrLogEditsRead() { + incrLogEditsRead(1); + } + + /** + * Add on the number of log edits filtered + * + * @param delta the number filtered. + */ + private void incrLogEditsFiltered(long delta) { + rms.incCounters(logEditsFilteredKey, delta); + rms.incCounters(SOURCE_LOG_EDITS_FILTERED, delta); + } + + /** The number of log edits filtered out. */ + public void incrLogEditsFiltered() { + incrLogEditsFiltered(1); + } + + /** + * Convience method to apply changes to metrics do to shipping a batch of logs. + * + * @param batchSize the size of the batch that was shipped to sinks. + */ + public void shipBatch(long batchSize) { + rms.incCounters(shippedBatchesKey, 1); + rms.incCounters(SOURCE_SHIPPED_BATCHES, 1); + rms.incCounters(shippedOpsKey, batchSize); + rms.incCounters(SOURCE_SHIPPED_OPS, batchSize); + } + + /** Removes all metrics about this Source. */ + public void clear() { + rms.removeGauge(sizeOfLogQueKey); + rms.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize); + lastQueueSize = 0; + rms.removeGauge(ageOfLastShippedOpKey); + + rms.removeCounter(logEditsFilteredKey); + rms.removeCounter(logEditsReadKey); + + } +}