diff --git src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 3831bba..8315c3a 100644 --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -458,6 +458,7 @@ public class ReplicationSource extends Thread throws IOException{ long seenEntries = 0; this.repLogReader.seek(); + long persitionBeforeRead = this.repLogReader.getPosition(); HLog.Entry entry = this.repLogReader.readNextAndSetPosition(); while (entry != null) { @@ -508,6 +509,8 @@ public class ReplicationSource extends Thread break; } } + this.metrics.logReadRateInByte + .inc((int) (this.repLogReader.getPosition() - persitionBeforeRead)); LOG.debug("currentNbOperations:" + currentNbOperations + " and seenEntries:" + seenEntries + " and size: " + this.currentSize); diff --git src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java index da0905c..e32a3bc 100644 --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java @@ -66,6 +66,9 @@ public class ReplicationSourceMetrics implements Updater { */ public final MetricsIntValue sizeOfLogQueue = new MetricsIntValue("sizeOfLogQueue", registry); + + /** Rate of log entries read by the source */ + public MetricsRate logReadRateInByte = new MetricsRate("logReadRateInByte", registry); // It's a little dirty to preset the age to now since if we fail // to replicate the very first time then it will show that age instead @@ -119,6 +122,7 @@ public class ReplicationSourceMetrics implements Updater { this.logEditsFilteredRate.pushMetric(this.metricsRecord); this.ageOfLastShippedOp.pushMetric(this.metricsRecord); this.sizeOfLogQueue.pushMetric(this.metricsRecord); + this.logReadRateInByte.pushMetric(this.metricsRecord); } this.metricsRecord.update(); }