commit d540e74fdcb6de8b85c295a41a97ab7d97579922 Author: Yu Li Date: Mon Jan 25 14:04:47 2016 +0800 HBASE-15160 Put back HFile's HDFS op latency sampling code and add metrics for monitoring This partially reverts HBASE-11586 (commit 13643807) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index ee0217a..974918b 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -327,4 +327,11 @@ public interface MetricsRegionServerSource extends BaseSource { String SPLIT_SUCCESS_KEY = "splitSuccessCount"; String SPLIT_SUCCESS_DESC = "Number of successfully executed splits"; String FLUSH_KEY = "flushTime"; + + String FS_READ_LATENCY = "fsReadLatency"; + String FS_READ_LATENCY_DESC = "Latency of HFile's sequential reads on this region server."; + String FS_PREAD_LATENCY = "fsPreadLatency"; + String FS_PREAD_LATENCY_DESC = "Latency of HFile's positional reads on this region server."; + String FS_WRITE_LATENCY = "fsWriteLatency"; + String FS_WRITE_LATENCY_DESC = "Latency of HFile's writes on this region server."; } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index 02dec8d..a2abf73 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -367,4 +367,19 @@ public interface MetricsRegionServerWrapper { * @return Count of requests blocked because the memstore size is larger than blockingMemStoreSize */ long getBlockedRequestsCount(); + + /** + * Get the latency of HFile's sequential reads. + */ + long getFsReadLatency(); + + /** + * Get the latency of HFile's positional reads. + */ + long getFsPreadLatency(); + + /** + * Get the latency of HFile's writes. + */ + long getFSWriteLatency(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index f40811c..0c2180d 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -307,6 +307,12 @@ public class MetricsRegionServerSourceImpl .addCounter(Interns.info(BLOCKED_REQUESTS_COUNT, BLOCKED_REQUESTS_COUNT_DESC), rsWrap.getBlockedRequestsCount()) + .addGauge(Interns.info(FS_READ_LATENCY, FS_READ_LATENCY_DESC), rsWrap.getFsReadLatency()) + .addGauge(Interns.info(FS_PREAD_LATENCY, FS_PREAD_LATENCY_DESC), + rsWrap.getFsPreadLatency()) + .addGauge(Interns.info(FS_WRITE_LATENCY, FS_WRITE_LATENCY_DESC), + rsWrap.getFSWriteLatency()) + .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), rsWrap.getZookeeperQuorum()) .tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName()) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 1e1835f..9bd9ae7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -34,12 +34,13 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -50,6 +51,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; @@ -60,12 +62,14 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; import org.apache.hadoop.hbase.protobuf.generated.HFileProtos; import org.apache.hadoop.hbase.util.BloomFilterWriter; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.Writable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * File format for hbase. @@ -178,12 +182,102 @@ public class HFile { * The number of bytes per checksum. */ public static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024; + + // For measuring latency of "sequential" reads and writes + private static final AtomicInteger readOps = new AtomicInteger(); + private static final AtomicLong readTimeNano = new AtomicLong(); + private static final AtomicInteger writeOps = new AtomicInteger(); + private static final AtomicLong writeTimeNano = new AtomicLong(); + + // For measuring latency of pread + private static final AtomicInteger preadOps = new AtomicInteger(); + private static final AtomicLong preadTimeNano = new AtomicLong(); + // For measuring number of checksum failures static final AtomicLong checksumFailures = new AtomicLong(); + // For getting more detailed stats on FS latencies + // If, for some reason, the metrics subsystem stops polling for latencies, + // I don't want data to pile up in a memory leak + // so, after LATENCY_BUFFER_SIZE items have been enqueued for processing, + // fs latency stats will be dropped (and this behavior will be logged) + private static final int LATENCY_BUFFER_SIZE = 5000; + private static final BlockingQueue fsReadLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + private static final BlockingQueue fsWriteLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + private static final BlockingQueue fsPreadLatenciesNanos = + new ArrayBlockingQueue(LATENCY_BUFFER_SIZE); + + public static final void offerReadLatency(long latencyNanos, boolean pread) { + if (pread) { + fsPreadLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full + preadOps.incrementAndGet(); + preadTimeNano.addAndGet(latencyNanos); + } else { + fsReadLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full + readTimeNano.addAndGet(latencyNanos); + readOps.incrementAndGet(); + } + } + + public static final void offerWriteLatency(long latencyNanos) { + fsWriteLatenciesNanos.offer(latencyNanos); // might be silently dropped, if the queue is full + + writeTimeNano.addAndGet(latencyNanos); + writeOps.incrementAndGet(); + } + + public static final Collection getReadLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsReadLatenciesNanos.size()); + fsReadLatenciesNanos.drainTo(latencies); + return latencies; + } + + public static final Collection getPreadLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsPreadLatenciesNanos.size()); + fsPreadLatenciesNanos.drainTo(latencies); + return latencies; + } + + public static final Collection getWriteLatenciesNanos() { + final List latencies = + Lists.newArrayListWithCapacity(fsWriteLatenciesNanos.size()); + fsWriteLatenciesNanos.drainTo(latencies); + return latencies; + } + // for test purpose public static final AtomicLong dataBlockReadCnt = new AtomicLong(0); + // number of sequential reads + public static final int getReadOps() { + return readOps.getAndSet(0); + } + + public static final long getReadTimeMs() { + return readTimeNano.getAndSet(0) / 1000000; + } + + // number of positional reads + public static final int getPreadOps() { + return preadOps.getAndSet(0); + } + + public static final long getPreadTimeMs() { + return preadTimeNano.getAndSet(0) / 1000000; + } + + public static final int getWriteOps() { + return writeOps.getAndSet(0); + } + + public static final long getWriteTimeMs() { + return writeTimeNano.getAndSet(0) / 1000000; + } + /** * Number of checksum verification failures. It also * clears the counter. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 4db26d1..e2c52f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1410,12 +1410,12 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { if (block == -1) return null; long blockSize = metaBlockIndexReader.getRootBlockDataSize(block); + long startTimeNs = System.nanoTime(); // Per meta key from any given file, synchronize reads for said block. This // is OK to do for meta blocks because the meta block index is always // single-level. - synchronized (metaBlockIndexReader - .getRootBlockKey(block)) { + synchronized (metaBlockIndexReader.getRootBlockKey(block)) { // Check cache for block. If found return. long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block); BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset, @@ -1436,6 +1436,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, blockSize, -1, true).unpack(hfileContext, fsBlockReader); + final long delta = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(delta, true); // Cache the block if (cacheBlock) { @@ -1516,11 +1518,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { traceScope.getSpan().addTimelineAnnotation("blockCacheMiss"); } // Load block from filesystem. + long startTimeNs = System.nanoTime(); HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, pread); validateBlockType(hfileBlock, expectedBlockType); HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); + final long delta = System.nanoTime() - startTimeNs; + HFile.offerReadLatency(delta, pread); // Cache the block if necessary if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 186d86b..4d2e014 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -309,6 +309,7 @@ public class HFileWriterImpl implements HFile.Writer { private void finishBlock() throws IOException { if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return; + long startTimeNs = System.nanoTime(); // Update the first data block offset for scanning. if (firstDataBlockOffset == -1) { firstDataBlockOffset = outputStream.getPos(); @@ -322,6 +323,7 @@ public class HFileWriterImpl implements HFile.Writer { dataBlockIndexWriter.addEntry(CellUtil.getCellKeySerializedAsKeyValueKey(indexEntry), lastDataBlockOffset, onDiskSize); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); + HFile.offerWriteLatency(System.nanoTime() - startTimeNs); if (cacheConf.shouldCacheDataOnWrite()) { doCacheOnWrite(lastDataBlockOffset); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index f3e8916..d39f80a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mob.MobCacheConfig; import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; @@ -99,6 +100,9 @@ class MetricsRegionServerWrapperImpl private volatile long mobFileCacheEvictedCount = 0; private volatile long mobFileCacheCount = 0; private volatile long blockedRequestsCount = 0L; + private volatile long fsReadLatency = 0; + private volatile long fsPreadLatency = 0; + private volatile long fsWriteLatency = 0; private CacheStats cacheStats; private ScheduledExecutorService executor; @@ -728,6 +732,18 @@ class MetricsRegionServerWrapperImpl mobFileCacheEvictedCount = mobFileCache.getEvictedFileCount(); mobFileCacheCount = mobFileCache.getCacheSize(); blockedRequestsCount = tempBlockedRequestsCount; + int ops = HFile.getReadOps(); + if (0 != ops) { + fsReadLatency = HFile.getReadTimeMs() / ops; + } + ops = HFile.getPreadOps(); + if (0 != ops) { + fsPreadLatency = HFile.getPreadTimeMs() / ops; + } + ops = HFile.getWriteOps(); + if (0 != ops) { + fsWriteLatency = HFile.getWriteTimeMs() / ops; + } } catch (Throwable e) { LOG.warn("Caught exception! Will suppress and retry.", e); } @@ -748,4 +764,19 @@ class MetricsRegionServerWrapperImpl public long getBlockedRequestsCount() { return blockedRequestsCount; } + + @Override + public long getFsReadLatency() { + return fsReadLatency; + } + + @Override + public long getFsPreadLatency() { + return fsPreadLatency; + } + + @Override + public long getFSWriteLatency() { + return fsWriteLatency; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index 0d93284..e4c98d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -359,4 +359,19 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe public double getMobFileCacheHitPercent() { return 50; } + + @Override + public long getFsReadLatency() { + return 408; + } + + @Override + public long getFsPreadLatency() { + return 675; + } + + @Override + public long getFSWriteLatency() { + return 410; + } }