commit 5a79762a18379b9f6e0b2b3481fb7da95cfb8323 Author: Yu Li Date: Mon Jan 25 14:04:47 2016 +0800 HBASE-15160 Put back HFile's HDFS op latency sampling code and add metrics for monitoring This partially reverts HBASE-11586 (commit 13643807) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index ee0217a..ae0ff6d 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -141,6 +141,24 @@ public interface MetricsRegionServerSource extends BaseSource { */ void updateFlushTime(long t); + /** + * Update the fs sequential read time histogram + * @param t time it took, in milliseconds + */ + void updateFsReadTime(long t); + + /** + * Update the fs positional read time histogram + * @param t time it took, in milliseconds + */ + void updateFsPReadTime(long t); + + /** + * Update the fs write time histogram + * @param t time it took, in milliseconds + */ + void updateFsWriteTime(long t); + // Strings used for exporting to metrics system. String REGION_COUNT = "regionCount"; String REGION_COUNT_DESC = "Number of regions"; @@ -327,4 +345,8 @@ public interface MetricsRegionServerSource extends BaseSource { String SPLIT_SUCCESS_KEY = "splitSuccessCount"; String SPLIT_SUCCESS_DESC = "Number of successfully executed splits"; String FLUSH_KEY = "flushTime"; + + String FS_READ_KEY = "fsReadTime"; + String FS_PREAD_KEY = "fsPReadTime"; + String FS_WRITE_KEY = "fsWriteTime"; } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index f40811c..6a5e59a 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -56,6 +56,10 @@ public class MetricsRegionServerSourceImpl private final MetricHistogram splitTimeHisto; private final MetricHistogram flushTimeHisto; + private final MetricHistogram fsReadTimeHisto; + private final MetricHistogram fsPReadTimeHisto; + private final MetricHistogram fsWriteTimeHisto; + public MetricsRegionServerSourceImpl(MetricsRegionServerWrapper rsWrap) { this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, rsWrap); } @@ -91,6 +95,10 @@ public class MetricsRegionServerSourceImpl splitRequest = getMetricsRegistry().newCounter(SPLIT_REQUEST_KEY, SPLIT_REQUEST_DESC, 0L); splitSuccess = getMetricsRegistry().newCounter(SPLIT_SUCCESS_KEY, SPLIT_SUCCESS_DESC, 0L); + + fsReadTimeHisto = getMetricsRegistry().newTimeHistogram(FS_READ_KEY); + fsPReadTimeHisto = getMetricsRegistry().newTimeHistogram(FS_PREAD_KEY); + fsWriteTimeHisto = getMetricsRegistry().newTimeHistogram(FS_WRITE_KEY); } @Override @@ -173,6 +181,21 @@ public class MetricsRegionServerSourceImpl flushTimeHisto.add(t); } + @Override + public void updateFsReadTime(long t) { + fsReadTimeHisto.add(t); + }; + + @Override + public void updateFsPReadTime(long t) { + fsPReadTimeHisto.add(t); + }; + + @Override + public void updateFsWriteTime(long t) { + fsWriteTimeHisto.add(t); + }; + /** * Yes this is a get function that doesn't return anything. Thanks Hadoop for breaking all * expectations of java programmers. Instead of returning anything Hadoop metrics expects diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 1e1835f..9bd9ae7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -34,12 +34,13 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -50,6 +51,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; @@ -60,12 +62,14 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; import org.apache.hadoop.hbase.protobuf.generated.HFileProtos; import org.apache.hadoop.hbase.util.BloomFilterWriter; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.Writable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * File format for hbase. @@ -178,12 +182,102 @@ public class HFile { * The number of bytes per checksum. */ public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024; + + // For measuring latency of "sequential" reads and writes + private static final AtomicInteger readOps = new AtomicInteger(); + private static final AtomicLong readTimeNano = new AtomicLong(); + private static final AtomicInteger writeOps = new AtomicInteger(); + private static final AtomicLong writeTimeNano = new AtomicLong(); + + // For measuring latency of pread + private static final AtomicInteger preadOps = new AtomicInteger(); + private static final AtomicLong preadTimeNano = new AtomicLong(); + // For measuring number of checksum failures static final AtomicLong checksumFailures = new AtomicLong(); + // For getting more detailed stats on FS latencies + // If, for some reason, the metrics subsystem stops polling for latencies, + // I don't want data to pile up in a memory leak + // so, after LATENCY_BUFFER_SIZE items have been enqueued for processing, + // fs latency stats will be dropped (and this behavior will be logged) + private static final int LATENCY_BUFFER_SIZE = 5000; + private static final BlockingQueue fsReadLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + private static final BlockingQueue fsWriteLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + private static final BlockingQueue fsPreadLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + + public static final void offerReadLatency(long latencyNanos, boolean pread) { + if (pread) { + fsPreadLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full + preadOps.incrementAndGet(); + preadTimeNano.addAndGet(latencyNanos); + } else { + fsReadLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full + readTimeNano.addAndGet(latencyNanos); + readOps.incrementAndGet(); + } + } + + public static final void offerWriteLatency(long latencyNanos) { + fsWriteLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full + + writeTimeNano.addAndGet(latencyNanos); + writeOps.incrementAndGet(); + } + + public static final Collection getReadLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsReadLatenciesNanos.size()); + fsReadLatenciesNanos.drainTo(latencies); + return latencies; + } + + public static final Collection getPreadLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsPreadLatenciesNanos.size()); + fsPreadLatenciesNanos.drainTo(latencies); + return latencies; + } + + public static final Collection getWriteLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsWriteLatenciesNanos.size()); + fsWriteLatenciesNanos.drainTo(latencies); + return latencies; + } + // for test purpose public static final AtomicLong dataBlockReadCnt = new AtomicLong(0); + // number of sequential reads + public static final int getReadOps() { + return readOps.getAndSet(0); + } + + public static final long getReadTimeMs() { + return readTimeNano.getAndSet(0) / 1000000; + } + + // number of positional reads + public static final int getPreadOps() { + return preadOps.getAndSet(0); + } + + public static final long getPreadTimeMs() { + return preadTimeNano.getAndSet(0) / 1000000; + } + + public static final int getWriteOps() { + return writeOps.getAndSet(0); + } + + public static final long getWriteTimeMs() { + return writeTimeNano.getAndSet(0) / 1000000; + } + /** * Number of checksum verification failures. It also * clears the counter. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 4db26d1..e2c52f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1410,12 +1410,12 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { if (block == -1) return null; long blockSize = metaBlockIndexReader.getRootBlockDataSize(block); + long startTimeNs = System.nanoTime(); // Per meta key from any given file, synchronize reads for said block. This // is OK to do for meta blocks because the meta block index is always // single-level. - synchronized (metaBlockIndexReader - .getRootBlockKey(block)) { + synchronized (metaBlockIndexReader.getRootBlockKey(block)) { // Check cache for block. If found return. long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset, @@ -1436,6 +1436,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, blockSize, -1, true).unpack(hfileContext, fsBlockReader); + final long delta = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(delta, true); // Cache the block if (cacheBlock) { @@ -1516,11 +1518,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { traceScope.getSpan().addTimelineAnnotation("blockCacheMiss"); } // Load block from filesystem. + long startTimeNs = System.nanoTime(); HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, pread); validateBlockType(hfileBlock, expectedBlockType); HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); + final long delta = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(delta, pread); // Cache the block if necessary if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 186d86b..4d2e014 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -309,6 +309,7 @@ public class HFileWriterImpl implements HFile.Writer { private void finishBlock() throws IOException { if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return; + long startTimeNs = System.nanoTime(); // Update the first data block offset for scanning. if (firstDataBlockOffset == -1) { firstDataBlockOffset = outputStream.getPos(); @@ -322,6 +323,7 @@ public class HFileWriterImpl implements HFile.Writer { dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), lastDataBlockOffset, onDiskSize); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + HFile.offerWriteLatency(System.nanoTime() - startTimeNs); if (cacheConf.shouldCacheDataOnWrite()) { doCacheOnWrite(lastDataBlockOffset); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index 91f494a..4f81688 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -116,4 +116,16 @@ public class MetricsRegionServer { public void updateFlushTime(long t) { serverSource.updateFlushTime(t); } + + public void updateFsReadTime(long t) { + serverSource.updateFsReadTime(t); + } + + public void updateFsPReadTime(long t) { + serverSource.updateFsPReadTime(t); + } + + public void updateFsWriteTime(long t) { + serverSource.updateFsWriteTime(t); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index f3e8916..feadcb9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mob.MobCacheConfig; import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; @@ -728,6 +729,18 @@ class MetricsRegionServerWrapperImpl mobFileCacheEvictedCount = mobFileCache.getEvictedFileCount(); mobFileCacheCount = mobFileCache.getCacheSize(); blockedRequestsCount = tempBlockedRequestsCount; + int ops = HFile.getReadOps(); + if (0 != ops) { + regionServer.metricsRegionServer.updateFsReadTime(HFile.getReadTimeMs() / ops); + } + ops = HFile.getPreadOps(); + if (0 != ops) { + regionServer.metricsRegionServer.updateFsPReadTime(HFile.getPreadTimeMs() / ops); + } + ops = HFile.getWriteOps(); + if (0 != ops) { + regionServer.metricsRegionServer.updateFsWriteTime(HFile.getWriteTimeMs() / ops); + } } catch (Throwable e) { LOG.warn("Caught exception! Will suppress and retry.", e); }