Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (revision 1402361) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (working copy) @@ -391,6 +391,8 @@ this.istream.close(); this.istream = null; } + + getSchemaMetrics().flushMetrics(); } protected abstract static class AbstractScannerV1 Index: src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (revision 1402361) +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (working copy) @@ -424,6 +424,8 @@ istream = null; } } + + getSchemaMetrics().flushMetrics(); } protected abstract static class AbstractScannerV2 Index: src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java (revision 1402361) +++ src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java (working copy) @@ -29,6 +29,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLongArray; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -38,7 +39,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -236,6 +236,9 @@ public static final SchemaMetrics ALL_SCHEMA_METRICS = getInstance(TOTAL_KEY, TOTAL_KEY); + /** Threshold for flush the metrics, currently used only for "on cache hit" */ + private static final long THRESHOLD_METRICS_FLUSH = 100l; + /** * Whether to include table name in metric names. If this is null, it has not * been initialized. This is a global instance, but we also have a copy of it @@ -254,6 +257,8 @@ private final String[] bloomMetricNames = new String[2]; private final String[] storeMetricNames = new String[NUM_STORE_METRIC_TYPES]; private final String[] storeMetricNamesMax = new String[NUM_STORE_METRIC_TYPES]; + private final AtomicLongArray onHitCacheMetrics= + new AtomicLongArray(NUM_BLOCK_CATEGORIES * BOOL_VALUES.length); private SchemaMetrics(final String tableName, final String cfName) { String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix( @@ -261,6 +266,9 @@ for (BlockCategory blockCategory : BlockCategory.values()) { for (boolean isCompaction : BOOL_VALUES) { + // initialize the cache metrics + onHitCacheMetrics.set(getCacheHitMetricIndex(blockCategory, isCompaction), 0); + for (BlockMetricType metricType : BlockMetricType.values()) { if (!metricType.compactionAware && isCompaction) { continue; @@ -336,6 +344,11 @@ return existingMetrics != null ? existingMetrics : schemaMetrics; } + private static final int getCacheHitMetricIndex (BlockCategory blockCategory, + boolean isCompaction) { + return blockCategory.ordinal() * BOOL_VALUES.length + (isCompaction ? 1 : 0); + } + private static final int getBlockMetricIndex(BlockCategory blockCategory, boolean isCompaction, BlockMetricType metricType) { int i = 0; @@ -365,11 +378,20 @@ */ private void incrNumericMetric(BlockCategory blockCategory, boolean isCompaction, BlockMetricType metricType) { + incrNumericMetric (blockCategory, isCompaction, metricType, 1); + } + + /** + * Increments the given metric, both per-CF and aggregate, for both the given + * category and all categories in aggregate (four counters total). + */ + private void incrNumericMetric(BlockCategory blockCategory, + boolean isCompaction, BlockMetricType metricType, long amount) { if (blockCategory == null) { blockCategory = BlockCategory.UNKNOWN; // So that we see this in stats. } RegionMetricsStorage.incrNumericMetric(getBlockMetricName(blockCategory, - isCompaction, metricType), 1); + isCompaction, metricType), amount); if (blockCategory != BlockCategory.ALL_CATEGORIES) { incrNumericMetric(BlockCategory.ALL_CATEGORIES, isCompaction, @@ -445,15 +467,60 @@ */ public void updateOnCacheHit(BlockCategory blockCategory, boolean isCompaction) { + updateOnCacheHit (blockCategory, isCompaction, 1); + } + + /** + * Updates the number of hits and the total number of block reads on a block + * cache hit. + */ + public void updateOnCacheHit(BlockCategory blockCategory, + boolean isCompaction, long count) { blockCategory.expectSpecific(); - incrNumericMetric(blockCategory, isCompaction, BlockMetricType.CACHE_HIT); - incrNumericMetric(blockCategory, isCompaction, BlockMetricType.READ_COUNT); - if (this != ALL_SCHEMA_METRICS) { - ALL_SCHEMA_METRICS.updateOnCacheHit(blockCategory, isCompaction); + int idx = getCacheHitMetricIndex(blockCategory, isCompaction); + + if (this.onHitCacheMetrics.addAndGet(idx, count) > THRESHOLD_METRICS_FLUSH) { + flushCertainOnCacheHitMetrics(blockCategory, isCompaction); } } + + private void flushCertainOnCacheHitMetrics(BlockCategory blockCategory, boolean isCompaction) { + int idx = getCacheHitMetricIndex(blockCategory, isCompaction); + long tempCount = this.onHitCacheMetrics.getAndSet(idx, 0); + + if (tempCount > 0) { + incrNumericMetric(blockCategory, isCompaction, BlockMetricType.CACHE_HIT, tempCount); + incrNumericMetric(blockCategory, isCompaction, BlockMetricType.READ_COUNT, tempCount); + if (this != ALL_SCHEMA_METRICS) { + ALL_SCHEMA_METRICS.updateOnCacheHit(blockCategory, isCompaction, tempCount); + } + } + } + + /** + * Flush the on cache hit metrics; + */ + private void flushOnCacheHitMetrics() { + for (BlockCategory blockCategory : BlockCategory.values()) { + for (boolean isCompaction : BOOL_VALUES) { + flushCertainOnCacheHitMetrics (blockCategory, isCompaction); + } + } + } /** + * Notify the SchemaMetrics to flush all of the the metrics + */ + public void flushMetrics() { + // currently only for "on cache hit metrics" + flushOnCacheHitMetrics(); + + if (this != ALL_SCHEMA_METRICS) { + ALL_SCHEMA_METRICS.flushMetrics(); + } + } + + /** * Updates read time, the number of misses, and the total number of block * reads on a block cache miss. */ Index: src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java (revision 1402361) +++ src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java (working copy) @@ -201,8 +201,10 @@ for (boolean isCompaction : BOOL_VALUES) { sm.updateOnCacheHit(blockCat, isCompaction); + sm.flushMetrics(); checkMetrics(); sm.updateOnCacheMiss(blockCat, isCompaction, rand.nextInt()); + sm.flushMetrics(); checkMetrics(); }