diff --git hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 3aa01ab..271f0ac 100644 --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -24,7 +24,10 @@ public interface MetricsReplicationSourceSource { public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches"; + @Deprecated + /** @deprecated Use SOURCE_SHIPPED_BYTES instead */ public static final String SOURCE_SHIPPED_KBS = "source.shippedKBs"; + public static final String SOURCE_SHIPPED_BYTES = "source.shippedBytes"; public static final String SOURCE_SHIPPED_OPS = "source.shippedOps"; public static final String SOURCE_LOG_READ_IN_BYTES = "source.logReadInBytes"; @@ -41,7 +44,7 @@ public interface MetricsReplicationSourceSource { void incrLogEditsFiltered(long size); void incrBatchesShipped(int batches); void incrOpsShipped(long ops); - void incrShippedKBs(long size); + void incrShippedBytes(long size); void incrLogReadInBytes(long size); void incrLogReadInEdits(long size); void clear(); diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 2526f32..476d2f7 100644 --- hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -30,6 +30,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS private final MutableFastCounter logEditsFilteredCounter; private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedOpsCounter; + private final MutableFastCounter shippedBytesCounter; + @Deprecated private final MutableFastCounter shippedKBsCounter; private final MutableFastCounter logReadInBytesCounter; private final MutableFastCounter shippedHFilesCounter; @@ -48,6 +50,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L); + shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); + logReadInBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_BYTES, 0L); logReadInEditsCounter = rms.getMetricsRegistry().getCounter(SOURCE_LOG_READ_IN_EDITS, 0L); @@ -88,8 +92,25 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS shippedOpsCounter.incr(ops); } - @Override public void incrShippedKBs(long size) { - shippedKBsCounter.incr(size); + @Override public void incrShippedBytes(long size) { + shippedBytesCounter.incr(size); + // obtained value maybe smaller than 1024. We should make sure that KB count + // eventually picks up even from multiple smaller updates. + incrementKBsCounter(shippedBytesCounter, shippedKBsCounter); + } + + static void incrementKBsCounter(MutableFastCounter bytesCounter, MutableFastCounter kbsCounter) { + // Following code should be thread-safe. + long delta = 0; + while(true) { + long bytes = bytesCounter.value(); + delta = (bytes / 1024) - kbsCounter.value(); + if (delta > 0) { + kbsCounter.incr(delta); + } else { + break; + } + } } @Override public void incrLogReadInBytes(long size) { diff --git hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 03e3116..835e81c 100644 --- hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -30,7 +30,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String logEditsFilteredKey; private final String shippedBatchesKey; private final String shippedOpsKey; + @Deprecated private final String shippedKBsKey; + private final String shippedBytesKey; private final String logReadInBytesKey; private final String shippedHFilesKey; private final String sizeOfHFileRefsQueueKey; @@ -42,6 +44,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableFastCounter shippedBatchesCounter; private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedKBsCounter; + private final MutableFastCounter shippedBytesCounter; private final MutableFastCounter logReadInBytesCounter; private final MutableFastCounter shippedHFilesCounter; private final MutableGaugeLong sizeOfHFileRefsQueueGauge; @@ -65,6 +68,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou shippedKBsKey = "source." + this.id + ".shippedKBs"; shippedKBsCounter = rms.getMetricsRegistry().getCounter(shippedKBsKey, 0L); + shippedBytesKey = "source." + this.id + ".shippedBytes"; + shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L); + logReadInBytesKey = "source." + this.id + ".logReadInBytes"; logReadInBytesCounter = rms.getMetricsRegistry().getCounter(logReadInBytesKey, 0L); @@ -109,8 +115,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou shippedOpsCounter.incr(ops); } - @Override public void incrShippedKBs(long size) { - shippedKBsCounter.incr(size); + @Override public void incrShippedBytes(long size) { + shippedBytesCounter.incr(size); + MetricsReplicationGlobalSourceSource + .incrementKBsCounter(shippedBytesCounter, shippedKBsCounter); } @Override public void incrLogReadInBytes(long size) { @@ -125,6 +133,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou rms.removeMetric(shippedBatchesKey); rms.removeMetric(shippedOpsKey); rms.removeMetric(shippedKBsKey); + rms.removeMetric(shippedBytesKey); rms.removeMetric(logReadInBytesKey); rms.removeMetric(logReadInEditsKey); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 4a044bf..b07f1d1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -136,15 +136,15 @@ public class MetricsSource { * * @param batchSize the size of the batch that was shipped to sinks. */ - public void shipBatch(long batchSize, int sizeInKB) { + public void shipBatch(long batchSize, int sizeInBytes) { singleSourceSource.incrBatchesShipped(1); globalSourceSource.incrBatchesShipped(1); singleSourceSource.incrOpsShipped(batchSize); globalSourceSource.incrOpsShipped(batchSize); - singleSourceSource.incrShippedKBs(sizeInKB); - globalSourceSource.incrShippedKBs(sizeInKB); + singleSourceSource.incrShippedBytes(sizeInBytes); + globalSourceSource.incrShippedBytes(sizeInBytes); } /** @@ -153,8 +153,8 @@ public class MetricsSource { * @param batchSize the size of the batch that was shipped to sinks. * @param hfiles total number of hfiles shipped to sinks. */ - public void shipBatch(long batchSize, int sizeInKB, long hfiles) { - shipBatch(batchSize, sizeInKB); + public void shipBatch(long batchSize, int sizeInBytes, long hfiles) { + shipBatch(batchSize, sizeInBytes); singleSourceSource.incrHFilesShipped(hfiles); globalSourceSource.incrHFilesShipped(hfiles); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 7e58e41..631603f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -1026,7 +1026,7 @@ public class ReplicationSource extends Thread totalReplicatedEdits.addAndGet(entries.size()); totalReplicatedOperations.addAndGet(currentNbOperations); // FIXME check relationship between wal group and overall - metrics.shipBatch(currentNbOperations, currentSize / 1024, currentNbHFiles); + metrics.shipBatch(currentNbOperations, currentSize, currentNbHFiles); metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId); if (LOG.isTraceEnabled()) {