commit c8a1ca54726d325fb234ef59d6c6e683cdcce223 Author: nspiegelberg Date: 35 seconds ago HBASE-4765 Enhance HLog Metrics diff --git a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 79d71a0..7be6e6a 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -142,15 +143,15 @@ public class HFile { static final char CACHE_KEY_SEPARATOR = '_'; // For measuring latency of "typical" reads and writes - static volatile AtomicLong readOps = new AtomicLong(); + static volatile AtomicInteger readOps = new AtomicInteger(); static volatile AtomicLong readTimeNano = new AtomicLong(); - static volatile AtomicLong writeOps = new AtomicLong(); + static volatile AtomicInteger writeOps = new AtomicInteger(); static volatile AtomicLong writeTimeNano = new AtomicLong(); // for test purpose public static volatile AtomicLong dataBlockReadCnt = new AtomicLong(0); - public static final long getReadOps() { + public static final int getReadOps() { return readOps.getAndSet(0); } @@ -158,7 +159,7 @@ public class HFile { return readTimeNano.getAndSet(0) / 1000000; } - public static final long getWriteOps() { + public static final int getWriteOps() { return writeOps.getAndSet(0); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java b/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java index d512686..d1db857 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java @@ -192,6 +192,12 @@ public class RegionServerMetrics implements Updater { new MetricsTimeVaryingRate("fsWriteLatency", registry); /** + * size (in bytes) of data in HLog append calls + */ + public final MetricsTimeVaryingRate fsWriteSize = + new MetricsTimeVaryingRate("fsWriteSize", registry); + + /** * filesystem sync latency */ public final MetricsTimeVaryingRate fsSyncLatency = @@ -299,19 +305,24 @@ public class RegionServerMetrics implements Updater { // minMax.update(timePerOps); // } // Means you can't pass a numOps of zero or get a ArithmeticException / by zero. - int ops = (int)HFile.getReadOps(); + // HLog metrics + addHLogMetric(HLog.getWriteTime(), this.fsWriteLatency); + addHLogMetric(HLog.getWriteSize(), this.fsWriteSize); + addHLogMetric(HLog.getSyncTime(), this.fsSyncLatency); + // HFile metrics + int ops = HFile.getReadOps(); if (ops != 0) this.fsReadLatency.inc(ops, HFile.getReadTimeMs()); - ops = (int)HFile.getWriteOps(); - if (ops != 0) this.fsWriteLatency.inc(ops, HFile.getWriteTimeMs()); - // mix in HLog metrics - ops = (int)HLog.getWriteOps(); - if (ops != 0) this.fsWriteLatency.inc(ops, HLog.getWriteTime()); - ops = (int)HLog.getSyncOps(); - if (ops != 0) this.fsSyncLatency.inc(ops, HLog.getSyncTime()); + /* NOTE: removed HFile write latency. 2 reasons: + * 1) Mixing HLog latencies are far higher priority since they're + * on-demand and HFile is used in background (compact/flush) + * 2) HFile metrics are being handled at a higher level + * by compaction & flush metrics. + */ // push the result this.fsReadLatency.pushMetric(this.metricsRecord); this.fsWriteLatency.pushMetric(this.metricsRecord); + this.fsWriteSize.pushMetric(this.metricsRecord); this.fsSyncLatency.pushMetric(this.metricsRecord); this.compactionTime.pushMetric(this.metricsRecord); this.compactionSize.pushMetric(this.metricsRecord); @@ -321,10 +332,23 @@ public class RegionServerMetrics implements Updater { this.metricsRecord.update(); } + private void addHLogMetric(HLog.Metric logMetric, + MetricsTimeVaryingRate hadoopMetric) { + if (logMetric.count > 0) + hadoopMetric.inc(logMetric.min); + if (logMetric.count > 1) + hadoopMetric.inc(logMetric.max); + if (logMetric.count > 2) { + int ops = logMetric.count - 2; + hadoopMetric.inc(ops, logMetric.total - logMetric.max - logMetric.min); + } + } + public void resetAllMinMax() { this.atomicIncrementTime.resetMinMax(); this.fsReadLatency.resetMinMax(); this.fsWriteLatency.resetMinMax(); + this.fsWriteSize.resetMinMax(); this.fsSyncLatency.resetMinMax(); } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 0a9566d..85c8cb1 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -262,32 +262,50 @@ public class HLog implements Syncable { } } + public static class Metric { + public long min = Long.MAX_VALUE; + public long max = 0; + public long total = 0; + public int count = 0; + + synchronized void inc(final long val) { + min = Math.min(min, val); + max = Math.max(max, val); + total += val; + ++count; + } + + synchronized Metric get() { + Metric copy = new Metric(); + copy.min = min; + copy.max = max; + copy.total = total; + copy.count = count; + this.min = Long.MAX_VALUE; + this.max = 0; + this.total = 0; + this.count = 0; + return copy; + } + } + // For measuring latency of writes - private static volatile long writeOps; - private static volatile long writeTime; + private static Metric writeTime = new Metric(); + private static Metric writeSize = new Metric(); // For measuring latency of syncs - private static AtomicLong syncOps = new AtomicLong(); - private static AtomicLong syncTime = new AtomicLong(); + private static Metric syncTime = new Metric(); private static AtomicLong syncBatchSize = new AtomicLong(); - public static long getWriteOps() { - long ret = writeOps; - writeOps = 0; - return ret; - } - - public static long getWriteTime() { - long ret = writeTime; - writeTime = 0; - return ret; + public static Metric getWriteTime() { + return writeTime.get(); } - public static long getSyncOps() { - return syncOps.getAndSet(0); + public static Metric getWriteSize() { + return writeSize.get(); } - public static long getSyncTime() { - return syncTime.getAndSet(0); + public static Metric getSyncTime() { + return syncTime.get(); } public static long getSyncBatchSize() { @@ -1247,8 +1265,7 @@ public class HLog implements Syncable { } // We try to not acquire the updateLock just to update statistics. // Make these statistics as AtomicLong. - syncTime.addAndGet(System.currentTimeMillis() - now); - syncOps.incrementAndGet(); + syncTime.inc(System.currentTimeMillis() - now); if (!this.logRollRunning) { checkLowReplication(); if (this.writer.getLength() > this.logrollsize) { @@ -1379,13 +1396,13 @@ public class HLog implements Syncable { } long took = System.currentTimeMillis() - now; coprocessorHost.postWALWrite(info, logKey, logEdit); - writeTime += took; - writeOps++; + writeTime.inc(took); + long len = 0; + for (KeyValue kv : logEdit.getKeyValues()) { + len += kv.getLength(); + } + writeSize.inc(len); if (took > 1000) { - long len = 0; - for(KeyValue kv : logEdit.getKeyValues()) { - len += kv.getLength(); - } LOG.warn(String.format( "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s", Thread.currentThread().getName(), took, this.numEntries.get(), @@ -1504,8 +1521,12 @@ public class HLog implements Syncable { System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); logSyncerThread.append(new Entry(key, edit)); txid = this.unflushedEntries.incrementAndGet(); - writeTime += System.currentTimeMillis() - now; - writeOps++; + writeTime.inc(System.currentTimeMillis() - now); + long len = 0; + for (KeyValue kv : edit.getKeyValues()) { + len += kv.getLength(); + } + writeSize.inc(len); this.numEntries.incrementAndGet(); } // sync txn to file system diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 3ba5f60..2d3acd9 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -348,8 +348,8 @@ public class TestHRegion extends HBaseTestCase { byte[] val = Bytes.toBytes("val"); initHRegion(b, getName(), cf); - HLog.getSyncOps(); // clear counter from prior tests - assertEquals(0, HLog.getSyncOps()); + HLog.getSyncTime(); // clear counter from prior tests + assertEquals(0, HLog.getSyncTime().count); LOG.info("First a batch put with all valid puts"); final Put[] puts = new Put[10]; @@ -364,7 +364,7 @@ public class TestHRegion extends HBaseTestCase { assertEquals(OperationStatusCode.SUCCESS, codes[i] .getOperationStatusCode()); } - assertEquals(1, HLog.getSyncOps()); + assertEquals(1, HLog.getSyncTime().count); LOG.info("Next a batch put with one invalid family"); puts[5].add(Bytes.toBytes("BAD_CF"), qual, val); @@ -374,7 +374,7 @@ public class TestHRegion extends HBaseTestCase { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } - assertEquals(1, HLog.getSyncOps()); + assertEquals(1, HLog.getSyncTime().count); LOG.info("Next a batch put that has to break into two batches to avoid a lock"); Integer lockedRow = region.obtainRowLock(Bytes.toBytes("row_2")); @@ -395,7 +395,7 @@ public class TestHRegion extends HBaseTestCase { LOG.info("...waiting for put thread to sync first time"); long startWait = System.currentTimeMillis(); - while (HLog.getSyncOps() == 0) { + while (HLog.getSyncTime().count == 0) { Thread.sleep(100); if (System.currentTimeMillis() - startWait > 10000) { fail("Timed out waiting for thread to sync first minibatch"); @@ -406,7 +406,7 @@ public class TestHRegion extends HBaseTestCase { LOG.info("...joining on thread"); ctx.stop(); LOG.info("...checking that next batch was synced"); - assertEquals(1, HLog.getSyncOps()); + assertEquals(1, HLog.getSyncTime().count); codes = retFromThread.get(); for (int i = 0; i < 10; i++) { assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : @@ -430,7 +430,7 @@ public class TestHRegion extends HBaseTestCase { OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode()); } // Make sure we didn't do an extra batch - assertEquals(1, HLog.getSyncOps()); + assertEquals(1, HLog.getSyncTime().count); // Make sure we still hold lock assertTrue(region.isRowLocked(lockedRow));