Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java (revision 1594437) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java (working copy) @@ -44,6 +44,10 @@ public final MetricsRate shippedOpsRate = new MetricsRate("shippedOpsRate", registry); + /** Rate of shipped bytes (in KB) by the source */ + public final MetricsRate shippedKBRate = + new MetricsRate("shippedBytesRate", registry); + /** Rate of shipped batches by the source */ public final MetricsRate shippedBatchesRate = new MetricsRate("shippedBatchesRate", registry); @@ -115,6 +119,7 @@ refreshAgeOfLastShippedOp(); this.shippedOpsRate.pushMetric(this.metricsRecord); this.shippedBatchesRate.pushMetric(this.metricsRecord); + this.shippedKBRate.pushMetric(this.metricsRecord); this.logEditsReadRate.pushMetric(this.metricsRecord); this.logEditsFilteredRate.pushMetric(this.metricsRecord); this.ageOfLastShippedOp.pushMetric(this.metricsRecord); Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1594437) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -426,6 +426,11 @@ this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo); this.lastLoggedPosition = this.repLogReader.getPosition(); } + if (!gotIOE) { + // if there was nothing to ship and it's not an error + // set "ageOfLastShippedOp" to to indicate that we're current + this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis()); + } if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { sleepMultiplier++; } @@ -740,7 +745,7 @@ } try { HRegionInterface rrs = getRS(); - LOG.debug("Replicating " + entries.size()); + LOG.debug("Replicating " + entries.size() + ", " + this.currentSize + " bytes"); // can't avoid the copy here, the replicateLogEntries RPC require an HLog.Entry[] rrs.replicateLogEntries(entries.toArray(new HLog.Entry[entries.size()])); if (this.lastLoggedPosition != this.repLogReader.getPosition()) { @@ -750,10 +755,9 @@ } this.totalReplicatedEdits += entries.size(); this.metrics.shippedBatchesRate.inc(1); - this.metrics.shippedOpsRate.inc( - this.currentNbOperations); - this.metrics.setAgeOfLastShippedOp( - entries.get(entries.size()-1).getKey().getWriteTime()); + this.metrics.shippedKBRate.inc(this.currentSize/1024); + this.metrics.shippedOpsRate.inc(this.currentNbOperations); + this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime()); LOG.debug("Replicated in total: " + this.totalReplicatedEdits); break;