commit 7e7bfdbfaa0f43ba5d7941b143c09a6cd0eae35d Author: Yu Li Date: Tue Jan 26 23:41:09 2016 +0800 HBASE-15160 Put back HFile's HDFS op latency sampling code and add metrics for monitoring This partially reverts HBASE-11586 (commit 13643807) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index 0f2f90b..81d63db 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -141,6 +141,24 @@ public interface MetricsRegionServerSource extends BaseSource { */ void updateFlushTime(long t); + /** + * Update the fs sequential read time histogram + * @param t time it took, in milliseconds + */ + void updateFsReadTime(long t); + + /** + * Update the fs positional read time histogram + * @param t time it took, in milliseconds + */ + void updateFsPReadTime(long t); + + /** + * Update the fs write time histogram + * @param t time it took, in milliseconds + */ + void updateFsWriteTime(long t); + // Strings used for exporting to metrics system. String REGION_COUNT = "regionCount"; String REGION_COUNT_DESC = "Number of regions"; @@ -342,4 +360,15 @@ public interface MetricsRegionServerSource extends BaseSource { String RPC_MUTATE_REQUEST_COUNT = "rpcMutateRequestCount"; String RPC_MUTATE_REQUEST_COUNT_DESC = "Number of rpc mutation requests this region server has answered."; + + String FS_READ_KEY = "fsReadTime"; + String FS_PREAD_KEY = "fsPReadTime"; + String FS_WRITE_KEY = "fsWriteTime"; + + String FS_READ_LATENCY = "fsReadLatency"; + String FS_READ_LATENCY_DESC = "Latency of HFile's sequential reads on this region server."; + String FS_PREAD_LATENCY = "fsPreadLatency"; + String FS_PREAD_LATENCY_DESC = "Latency of HFile's positional reads on this region server."; + String FS_WRITE_LATENCY = "fsWriteLatency"; + String FS_WRITE_LATENCY_DESC = "Latency of HFile's writes on this region server."; } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index ee2b5a1..b1d9f11 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -392,4 +392,19 @@ public interface MetricsRegionServerWrapper { * Get the number of rpc mutate requests to this region server. */ long getRpcMutateRequestsCount(); + + /** + * Get the latency of HFile's sequential reads. + */ + long getFsReadLatency(); + + /** + * Get the latency of HFile's positional reads. + */ + long getFsPreadLatency(); + + /** + * Get the latency of HFile's writes. + */ + long getFSWriteLatency(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index 9134f46..a2570e5 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -56,6 +56,10 @@ public class MetricsRegionServerSourceImpl private final MetricHistogram splitTimeHisto; private final MetricHistogram flushTimeHisto; + private final MetricHistogram fsReadTimeHisto; + private final MetricHistogram fsPReadTimeHisto; + private final MetricHistogram fsWriteTimeHisto; + public MetricsRegionServerSourceImpl(MetricsRegionServerWrapper rsWrap) { this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT, rsWrap); } @@ -91,6 +95,10 @@ public class MetricsRegionServerSourceImpl splitRequest = getMetricsRegistry().newCounter(SPLIT_REQUEST_KEY, SPLIT_REQUEST_DESC, 0L); splitSuccess = getMetricsRegistry().newCounter(SPLIT_SUCCESS_KEY, SPLIT_SUCCESS_DESC, 0L); + + fsReadTimeHisto = getMetricsRegistry().newTimeHistogram(FS_READ_KEY); + fsPReadTimeHisto = getMetricsRegistry().newTimeHistogram(FS_PREAD_KEY); + fsWriteTimeHisto = getMetricsRegistry().newTimeHistogram(FS_WRITE_KEY); } @Override @@ -173,6 +181,21 @@ public class MetricsRegionServerSourceImpl flushTimeHisto.add(t); } + @Override + public void updateFsReadTime(long t) { + fsReadTimeHisto.add(t); + }; + + @Override + public void updateFsPReadTime(long t) { + fsPReadTimeHisto.add(t); + }; + + @Override + public void updateFsWriteTime(long t) { + fsWriteTimeHisto.add(t); + }; + /** * Yes this is a get function that doesn't return anything. Thanks Hadoop for breaking all * expectations of java programmers. Instead of returning anything Hadoop metrics expects @@ -320,7 +343,13 @@ public class MetricsRegionServerSourceImpl .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), rsWrap.getZookeeperQuorum()) .tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName()) - .tag(Interns.info(CLUSTER_ID_NAME, CLUSTER_ID_DESC), rsWrap.getClusterId()); + .tag(Interns.info(CLUSTER_ID_NAME, CLUSTER_ID_DESC), rsWrap.getClusterId()) + + .addGauge(Interns.info(FS_READ_LATENCY, FS_READ_LATENCY_DESC), rsWrap.getFsReadLatency()) + .addGauge(Interns.info(FS_PREAD_LATENCY, FS_PREAD_LATENCY_DESC), + rsWrap.getFsPreadLatency()) + .addGauge(Interns.info(FS_WRITE_LATENCY, FS_WRITE_LATENCY_DESC), + rsWrap.getFSWriteLatency()); } metricsRegistry.snapshot(mrb, all); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 1e1835f..9bd9ae7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -34,12 +34,13 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -50,6 +51,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; @@ -60,12 +62,14 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; import org.apache.hadoop.hbase.protobuf.generated.HFileProtos; import org.apache.hadoop.hbase.util.BloomFilterWriter; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.Writable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * File format for hbase. @@ -178,12 +182,102 @@ public class HFile { * The number of bytes per checksum. */ public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024; + + // For measuring latency of "sequential" reads and writes + private static final AtomicInteger readOps = new AtomicInteger(); + private static final AtomicLong readTimeNano = new AtomicLong(); + private static final AtomicInteger writeOps = new AtomicInteger(); + private static final AtomicLong writeTimeNano = new AtomicLong(); + + // For measuring latency of pread + private static final AtomicInteger preadOps = new AtomicInteger(); + private static final AtomicLong preadTimeNano = new AtomicLong(); + // For measuring number of checksum failures static final AtomicLong checksumFailures = new AtomicLong(); + // For getting more detailed stats on FS latencies + // If, for some reason, the metrics subsystem stops polling for latencies, + // I don't want data to pile up in a memory leak + // so, after LATENCY_BUFFER_SIZE items have been enqueued for processing, + // fs latency stats will be dropped (and this behavior will be logged) + private static final int LATENCY_BUFFER_SIZE = 5000; + private static final BlockingQueue fsReadLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + private static final BlockingQueue fsWriteLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + private static final BlockingQueue fsPreadLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + + public static final void offerReadLatency(long latencyNanos, boolean pread) { + if (pread) { + fsPreadLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full + preadOps.incrementAndGet(); + preadTimeNano.addAndGet(latencyNanos); + } else { + fsReadLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full + readTimeNano.addAndGet(latencyNanos); + readOps.incrementAndGet(); + } + } + + public static final void offerWriteLatency(long latencyNanos) { + fsWriteLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full + + writeTimeNano.addAndGet(latencyNanos); + writeOps.incrementAndGet(); + } + + public static final Collection getReadLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsReadLatenciesNanos.size()); + fsReadLatenciesNanos.drainTo(latencies); + return latencies; + } + + public static final Collection getPreadLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsPreadLatenciesNanos.size()); + fsPreadLatenciesNanos.drainTo(latencies); + return latencies; + } + + public static final Collection getWriteLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsWriteLatenciesNanos.size()); + fsWriteLatenciesNanos.drainTo(latencies); + return latencies; + } + // for test purpose public static final AtomicLong dataBlockReadCnt = new AtomicLong(0); + // number of sequential reads + public static final int getReadOps() { + return readOps.getAndSet(0); + } + + public static final long getReadTimeMs() { + return readTimeNano.getAndSet(0) / 1000000; + } + + // number of positional reads + public static final int getPreadOps() { + return preadOps.getAndSet(0); + } + + public static final long getPreadTimeMs() { + return preadTimeNano.getAndSet(0) / 1000000; + } + + public static final int getWriteOps() { + return writeOps.getAndSet(0); + } + + public static final long getWriteTimeMs() { + return writeTimeNano.getAndSet(0) / 1000000; + } + /** * Number of checksum verification failures. It also * clears the counter. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index a873280..6e6f89f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1411,12 +1411,12 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { if (block == -1) return null; long blockSize = metaBlockIndexReader.getRootBlockDataSize(block); + long startTimeNs = System.nanoTime(); // Per meta key from any given file, synchronize reads for said block. This // is OK to do for meta blocks because the meta block index is always // single-level. - synchronized (metaBlockIndexReader - .getRootBlockKey(block)) { + synchronized (metaBlockIndexReader.getRootBlockKey(block)) { // Check cache for block. If found return. long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset, @@ -1437,6 +1437,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, blockSize, -1, true).unpack(hfileContext, fsBlockReader); + final long delta = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(delta, true); // Cache the block if (cacheBlock) { @@ -1517,11 +1519,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { traceScope.getSpan().addTimelineAnnotation("blockCacheMiss"); } // Load block from filesystem. + long startTimeNs = System.nanoTime(); HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, pread); validateBlockType(hfileBlock, expectedBlockType); HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); + final long delta = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(delta, pread); // Cache the block if necessary if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 186d86b..4d2e014 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -309,6 +309,7 @@ public class HFileWriterImpl implements HFile.Writer { private void finishBlock() throws IOException { if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return; + long startTimeNs = System.nanoTime(); // Update the first data block offset for scanning. if (firstDataBlockOffset == -1) { firstDataBlockOffset = outputStream.getPos(); @@ -322,6 +323,7 @@ public class HFileWriterImpl implements HFile.Writer { dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), lastDataBlockOffset, onDiskSize); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + HFile.offerWriteLatency(System.nanoTime() - startTimeNs); if (cacheConf.shouldCacheDataOnWrite()) { doCacheOnWrite(lastDataBlockOffset); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java index 91f494a..4f81688 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java @@ -116,4 +116,16 @@ public class MetricsRegionServer { public void updateFlushTime(long t) { serverSource.updateFlushTime(t); } + + public void updateFsReadTime(long t) { + serverSource.updateFsReadTime(t); + } + + public void updateFsPReadTime(long t) { + serverSource.updateFsPReadTime(t); + } + + public void updateFsWriteTime(long t) { + serverSource.updateFsWriteTime(t); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 777c960..d471240 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mob.MobCacheConfig; import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; @@ -100,6 +101,9 @@ class MetricsRegionServerWrapperImpl private volatile long mobFileCacheEvictedCount = 0; private volatile long mobFileCacheCount = 0; private volatile long blockedRequestsCount = 0L; + private volatile long fsReadLatency = 0; + private volatile long fsPreadLatency = 0; + private volatile long fsWriteLatency = 0; private CacheStats cacheStats; private ScheduledExecutorService executor; @@ -757,6 +761,29 @@ class MetricsRegionServerWrapperImpl mobFileCacheEvictedCount = mobFileCache.getEvictedFileCount(); mobFileCacheCount = mobFileCache.getCacheSize(); blockedRequestsCount = tempBlockedRequestsCount; + // set time serial metrics which depend on collection period + int ops = HFile.getReadOps(); + if (0 != ops) { + fsReadLatency = HFile.getReadTimeMs() / ops; + } + ops = HFile.getPreadOps(); + if (0 != ops) { + fsPreadLatency = HFile.getPreadTimeMs() / ops; + } + ops = HFile.getWriteOps(); + if (0 != ops) { + fsWriteLatency = HFile.getWriteTimeMs() / ops; + } + // update histograms to catch spike + for (Long latency : HFile.getReadLatenciesNanos()) { + regionServer.metricsRegionServer.updateFsReadTime(latency / 1000000); + } + for (Long latency : HFile.getPreadLatenciesNanos()) { + regionServer.metricsRegionServer.updateFsPReadTime(latency / 1000000); + } + for (Long latency : HFile.getWriteLatenciesNanos()) { + regionServer.metricsRegionServer.updateFsWriteTime(latency / 1000000); + } } catch (Throwable e) { LOG.warn("Caught exception! Will suppress and retry.", e); } @@ -777,4 +804,19 @@ class MetricsRegionServerWrapperImpl public long getBlockedRequestsCount() { return blockedRequestsCount; } + + @Override + public long getFsReadLatency() { + return fsReadLatency; + } + + @Override + public long getFsPreadLatency() { + return fsPreadLatency; + } + + @Override + public long getFSWriteLatency() { + return fsWriteLatency; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index 95a7cf5..4b1145c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -384,4 +384,19 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe public double getMobFileCacheHitPercent() { return 50; } + + @Override + public long getFsReadLatency() { + return 408; + } + + @Override + public long getFsPreadLatency() { + return 675; + } + + @Override + public long getFSWriteLatency() { + return 410; + } }