diff --git c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 0b6987e..bb309ff 100644 --- c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationSinkMetrics; import org.apache.hadoop.hbase.util.Bytes; /** @@ -133,7 +134,7 @@ public class ReplicationSink { } this.metrics.setAgeOfLastAppliedOp( entries[entries.length-1].getKey().getWriteTime()); - this.metrics.appliedBatchesRate.inc(1); + this.metrics.applyBatch(entries.length); LOG.info("Total replicated: " + totalReplicated); } catch (IOException ex) { LOG.error("Unable to accept edit because:", ex); @@ -173,7 +174,6 @@ public class ReplicationSink { try { table = this.pool.getTable(tableName); table.batch(rows); - this.metrics.appliedOpsRate.inc(rows.size()); } catch (InterruptedException ix) { throw new IOException(ix); } finally { diff --git c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java deleted file mode 100644 index bf324e2..0000000 --- c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.metrics.MetricsRate; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.Updater; -import org.apache.hadoop.metrics.jvm.JvmMetrics; -import org.apache.hadoop.metrics.util.MetricsIntValue; -import org.apache.hadoop.metrics.util.MetricsLongValue; -import org.apache.hadoop.metrics.util.MetricsRegistry; - -/** - * This class is for maintaining the various replication statistics - * for a sink and publishing them through the metrics interfaces. - */ -@InterfaceAudience.Private -public class ReplicationSinkMetrics implements Updater { - private final MetricsRecord metricsRecord; - private MetricsRegistry registry = new MetricsRegistry(); - - /** Rate of operations applied by the sink */ - public final MetricsRate appliedOpsRate = - new MetricsRate("appliedOpsRate", registry); - - /** Rate of batches (of operations) applied by the sink */ - public final MetricsRate appliedBatchesRate = - new MetricsRate("appliedBatchesRate", registry); - - /** Age of the last operation that was applied by the sink */ - private final MetricsLongValue ageOfLastAppliedOp = - new MetricsLongValue("ageOfLastAppliedOp", registry); - - /** - * Constructor used to register the metrics - */ - public ReplicationSinkMetrics() { - MetricsContext context = MetricsUtil.getContext("hbase"); - String name = Thread.currentThread().getName(); - metricsRecord = MetricsUtil.createRecord(context, "replication"); - metricsRecord.setTag("RegionServer", name); - context.registerUpdater(this); - // export for JMX - new ReplicationStatistics(this.registry, "ReplicationSink"); - } - - /** - * Set the age of the last edit that was applied - * @param timestamp write time of the edit - */ - public void setAgeOfLastAppliedOp(long timestamp) { - ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp); - } - @Override - public void doUpdates(MetricsContext metricsContext) { - synchronized (this) { - this.appliedOpsRate.pushMetric(this.metricsRecord); - this.appliedBatchesRate.pushMetric(this.metricsRecord); - this.ageOfLastAppliedOp.pushMetric(this.metricsRecord); - } - this.metricsRecord.update(); - } -} diff --git c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2c328ea..232e09a 100644 --- c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.replication.regionserver.metrics.ReplicationSourceMetrics; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; @@ -238,7 +239,7 @@ public class ReplicationSource extends Thread @Override public void enqueueLog(Path log) { this.queue.put(log); - this.metrics.sizeOfLogQueue.set(queue.size()); + this.metrics.setSizeOfLogQueue(queue.size()); } @Override @@ -393,7 +394,7 @@ public class ReplicationSource extends Thread HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]); while (entry != null) { WALEdit edit = entry.getEdit(); - this.metrics.logEditsReadRate.inc(1); + this.metrics.incrLogEditsRead(); seenEntries++; // Remove all KVs that should not be replicated HLogKey logKey = entry.getKey(); @@ -415,7 +416,7 @@ public class ReplicationSource extends Thread currentNbOperations += countDistinctRowKeys(edit); currentNbEntries++; } else { - this.metrics.logEditsFilteredRate.inc(1); + this.metrics.incrLogEditsFiltered(); } } // Stop if too many entries or too big @@ -455,7 +456,7 @@ public class ReplicationSource extends Thread try { if (this.currentPath == null) { this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS); - this.metrics.sizeOfLogQueue.set(queue.size()); + this.metrics.setSizeOfLogQueue(queue.size()); } } catch (InterruptedException e) { LOG.warn("Interrupted while reading edits", e); @@ -616,9 +617,7 @@ public class ReplicationSource extends Thread this.lastLoggedPosition = this.position; } this.totalReplicatedEdits += currentNbEntries; - this.metrics.shippedBatchesRate.inc(1); - this.metrics.shippedOpsRate.inc( - this.currentNbOperations); + this.metrics.shipBatch(this.currentNbOperations); this.metrics.setAgeOfLastShippedOp( this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime()); LOG.debug("Replicated in total: " + this.totalReplicatedEdits); diff --git c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java deleted file mode 100644 index 543e15d..0000000 --- c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.metrics.MetricsRate; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.Updater; -import org.apache.hadoop.metrics.jvm.JvmMetrics; -import org.apache.hadoop.metrics.util.MetricsIntValue; -import org.apache.hadoop.metrics.util.MetricsLongValue; -import org.apache.hadoop.metrics.util.MetricsRegistry; - -/** - * This class is for maintaining the various replication statistics - * for a source and publishing them through the metrics interfaces. - */ -@InterfaceAudience.Private -public class ReplicationSourceMetrics implements Updater { - private final MetricsRecord metricsRecord; - private MetricsRegistry registry = new MetricsRegistry(); - - /** Rate of shipped operations by the source */ - public final MetricsRate shippedOpsRate = - new MetricsRate("shippedOpsRate", registry); - - /** Rate of shipped batches by the source */ - public final MetricsRate shippedBatchesRate = - new MetricsRate("shippedBatchesRate", registry); - - /** Rate of log entries (can be multiple Puts) read from the logs */ - public final MetricsRate logEditsReadRate = - new MetricsRate("logEditsReadRate", registry); - - /** Rate of log entries filtered by the source */ - public final MetricsRate logEditsFilteredRate = - new MetricsRate("logEditsFilteredRate", registry); - - /** Age of the last operation that was shipped by the source */ - private final MetricsLongValue ageOfLastShippedOp = - new MetricsLongValue("ageOfLastShippedOp", registry); - - /** - * Current size of the queue of logs to replicate, - * excluding the one being processed at the moment - */ - public final MetricsIntValue sizeOfLogQueue = - new MetricsIntValue("sizeOfLogQueue", registry); - - // It's a little dirty to preset the age to now since if we fail - // to replicate the very first time then it will show that age instead - // of nothing (although that might not be good either). - private long lastTimestampForAge = System.currentTimeMillis(); - - /** - * Constructor used to register the metrics - * @param id Name of the source this class is monitoring - */ - public ReplicationSourceMetrics(String id) { - MetricsContext context = MetricsUtil.getContext("hbase"); - String name = Thread.currentThread().getName(); - metricsRecord = MetricsUtil.createRecord(context, "replication"); - metricsRecord.setTag("RegionServer", name); - context.registerUpdater(this); - try { - id = URLEncoder.encode(id, "UTF8"); - } catch (UnsupportedEncodingException e) { - id = "CAN'T ENCODE UTF8"; - } - // export for JMX - new ReplicationStatistics(this.registry, "ReplicationSource for " + id); - } - - /** - * Set the age of the last edit that was shipped - * @param timestamp write time of the edit - */ - public void setAgeOfLastShippedOp(long timestamp) { - lastTimestampForAge = timestamp; - ageOfLastShippedOp.set(System.currentTimeMillis() - lastTimestampForAge); - } - - /** - * Convenience method to use the last given timestamp to refresh the age - * of the last edit. Used when replication fails and need to keep that - * metric accurate. - */ - public void refreshAgeOfLastShippedOp() { - setAgeOfLastShippedOp(lastTimestampForAge); - } - - @Override - public void doUpdates(MetricsContext metricsContext) { - synchronized (this) { - this.shippedOpsRate.pushMetric(this.metricsRecord); - this.shippedBatchesRate.pushMetric(this.metricsRecord); - this.logEditsReadRate.pushMetric(this.metricsRecord); - this.logEditsFilteredRate.pushMetric(this.metricsRecord); - this.ageOfLastShippedOp.pushMetric(this.metricsRecord); - this.sizeOfLogQueue.pushMetric(this.metricsRecord); - } - this.metricsRecord.update(); - } -} diff --git c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java deleted file mode 100644 index ceeff5f..0000000 --- c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication.regionserver; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.metrics.MetricsMBeanBase; -import org.apache.hadoop.metrics.util.MBeanUtil; -import org.apache.hadoop.metrics.util.MetricsRegistry; - -import javax.management.ObjectName; - -/** - * Exports metrics recorded by {@link ReplicationSourceMetrics} as an MBean - * for JMX monitoring. - */ -@InterfaceAudience.Private -public class ReplicationStatistics extends MetricsMBeanBase { - - private final ObjectName mbeanName; - - /** - * Constructor to register the MBean - * @param registry which rehistry to use - * @param name name to get to this bean - */ - public ReplicationStatistics(MetricsRegistry registry, String name) { - super(registry, name); - mbeanName = MBeanUtil.registerMBean("Replication", name, this); - } -} diff --git c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationMetricsSource.java w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationMetricsSource.java new file mode 100644 index 0000000..d2f0b91 --- /dev/null +++ w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationMetricsSource.java @@ -0,0 +1,62 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver.metrics; + + +import org.apache.hadoop.metrics2.MetricsBuilder; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong; +import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong; + +import java.util.Map; + +/** + * + */ +public class ReplicationMetricsSource implements MetricsSource { + + private static ReplicationMetricsSource INSTANCE = new ReplicationMetricsSource(); + + + + public static ReplicationMetricsSource getInstance() { + return INSTANCE; + } + + protected ReplicationMetricsSource() { + DefaultMetricsSystem.registerSource("Replication", "Replication", this); + } + + @Override + public void getMetrics(MetricsBuilder metricsBuilder, boolean all) { + for(Map.Entry entry:ReplicationMetricsStorage.counters.entrySet()) { + MetricsRecordBuilder rb = metricsBuilder.addRecord(entry.getKey()); + entry.getValue().snapshot(rb, all); + } + for(Map.Entry entry:ReplicationMetricsStorage.gauges.entrySet()) { + MetricsRecordBuilder rb = metricsBuilder.addRecord(entry.getKey()); + entry.getValue().snapshot(rb, all); + } + + + } +} diff --git c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationMetricsStorage.java w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationMetricsStorage.java new file mode 100644 index 0000000..ef91613 --- /dev/null +++ w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationMetricsStorage.java @@ -0,0 +1,73 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver.metrics; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong; +import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + +/** + * + */ +public class ReplicationMetricsStorage { + + static ConcurrentMap gauges = new ConcurrentHashMap(); + static ConcurrentMap counters = new ConcurrentHashMap(); + + public static void setGauge(String gaugeName, long value) { + MetricMutableGaugeLong gaugeInt = gauges.get(gaugeName); + if (gaugeInt == null) { + gaugeInt = gauges.putIfAbsent(gaugeName, new MetricMutableGaugeLong(gaugeName, "", value)); + } + gaugeInt.set(value); + } + + public static void incGauge(String gaugeName, long delta) { + MetricMutableGaugeLong gaugeInt = gauges.get(gaugeName); + if (gaugeInt == null) { + gaugeInt = gauges.putIfAbsent(gaugeName, new MetricMutableGaugeLong(gaugeName, "", 0)); + } + gaugeInt.incr(delta); + } + + public static void decGauge(String gaugeName, long delta) { + MetricMutableGaugeLong gaugeInt = gauges.get(gaugeName); + if (gaugeInt == null) { + gaugeInt = gauges.putIfAbsent(gaugeName, new MetricMutableGaugeLong(gaugeName, "", 0)); + } + gaugeInt.decr(delta); + } + + public static void incCounters(String key, long delta) { + MetricMutableCounterLong counter = counters.get(key); + if (counter == null) { + counter = counters.putIfAbsent(key, new MetricMutableCounterLong(key, "", 0)); + } + counter.incr(delta); + + } + + public static void removeGauge(String key) { + gauges.remove(key); + } +} diff --git c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSinkMetrics.java w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSinkMetrics.java new file mode 100644 index 0000000..535c27f --- /dev/null +++ w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSinkMetrics.java @@ -0,0 +1,43 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver.metrics; +import org.apache.hadoop.classification.InterfaceAudience; + + +/** + * This class is for maintaining the various replication statistics + * for a sink and publishing them through the metrics interfaces. + */ +@InterfaceAudience.Private +public class ReplicationSinkMetrics { + + public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp"; + + public void setAgeOfLastAppliedOp(long timestamp) { + long age = System.currentTimeMillis() - timestamp; + ReplicationMetricsStorage.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age); + } + + public void applyBatch(long batchSize) { + ReplicationMetricsStorage.incCounters("sink.appliedBatches", 1); + ReplicationMetricsStorage.incCounters("sink.appliedOps", batchSize); + } + +} diff --git c/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSourceMetrics.java w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSourceMetrics.java new file mode 100644 index 0000000..424bca9 --- /dev/null +++ w/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/metrics/ReplicationSourceMetrics.java @@ -0,0 +1,122 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * This class is for maintaining the various replication statistics + * for a source and publishing them through the metrics interfaces. + */ +@InterfaceAudience.Private +public class ReplicationSourceMetrics { + + public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue"; + public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; + public static final String SOURCE_LOG_EDITS_READ = "source.logEditsRead"; + public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered"; + public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches"; + public static final String SOURCE_SHIPPED_OPS = "source.shippedOps"; + private String id; + + private long lastTimestamp = 0; + private int lastQueueSize = 0; + + private String sizeOfLogQueKey; + private String ageOfLastShippedOpKey; + private String logEditsReadKey; + private String logEditsFilteredKey; + private final String shippedBatchesKey; + private final String shippedOpsKey; + + /** + * Constructor used to register the metrics + * @param id Name of the source this class is monitoring + */ + public ReplicationSourceMetrics(String id) { + this.id = id; + + sizeOfLogQueKey = "source."+id+".sizeOfLogQueue"; + ageOfLastShippedOpKey = "source."+id+".ageOfLastShippedOp"; + logEditsReadKey = "source."+id+".logEditsRead"; + logEditsFilteredKey = "source."+id+".logEditsFiltered"; + shippedBatchesKey = "source." + this.id + ".shippedBatches"; + shippedOpsKey = "source." + this.id + ".shippedOps"; + } + + /** + * Set the age of the last edit that was shipped + * @param timestamp write time of the edit + */ + public void setAgeOfLastShippedOp(long timestamp) { + long age = System.currentTimeMillis() - timestamp; + ReplicationMetricsStorage.setGauge(ageOfLastShippedOpKey, age); + ReplicationMetricsStorage.setGauge(SOURCE_AGE_OF_LAST_SHIPPED_OP, age); + this.lastTimestamp = timestamp; + } + + /** + * Convenience method to use the last given timestamp to refresh the age + * of the last edit. Used when replication fails and need to keep that + * metric accurate. + */ + public void refreshAgeOfLastShippedOp() { + if (this.lastTimestamp > 0) { + setAgeOfLastShippedOp(this.lastTimestamp); + } + } + + public void setSizeOfLogQueue(int size) { + ReplicationMetricsStorage.setGauge(sizeOfLogQueKey, size); + ReplicationMetricsStorage.incGauge(SOURCE_SIZE_OF_LOG_QUEUE, size - lastQueueSize); + lastQueueSize = size; + } + + private void incrLogEditsRead(long delta) { + ReplicationMetricsStorage.incCounters(logEditsReadKey, delta); + ReplicationMetricsStorage.incCounters(SOURCE_LOG_EDITS_READ, delta); + } + + public void incrLogEditsRead() { + incrLogEditsRead(1); + } + + private void incrLogEditsFiltered(long delta) { + ReplicationMetricsStorage.incCounters(logEditsFilteredKey, delta); + ReplicationMetricsStorage.incCounters(SOURCE_LOG_EDITS_FILTERED, delta); + } + + public void incrLogEditsFiltered() { + incrLogEditsRead(1); + } + + public void shipBatch(long batchSize) { + ReplicationMetricsStorage.incCounters(shippedBatchesKey, 1); + ReplicationMetricsStorage.incCounters(SOURCE_SHIPPED_BATCHES, 1); + ReplicationMetricsStorage.incCounters(shippedOpsKey, batchSize); + ReplicationMetricsStorage.incCounters(SOURCE_SHIPPED_OPS, batchSize); + } + + public void clear() { + ReplicationMetricsStorage.removeGauge(sizeOfLogQueKey); + ReplicationMetricsStorage.decGauge(SOURCE_SIZE_OF_LOG_QUEUE, lastQueueSize); + ReplicationMetricsStorage.removeGauge(ageOfLastShippedOpKey); + } +}