diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index a80745d..696efa6 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -490,4 +490,17 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo String AVERAGE_REGION_SIZE = "averageRegionSize"; String AVERAGE_REGION_SIZE_DESC = "Average region size over the region server including memstore and storefile sizes."; + + String FLUSH_HDFS_WRITE_SIZE = "flushHdfsWriteSize"; + String FLUSH_HDFS_WRITE_SIZE_DESC = "Total number of bytes that is write to HDFS by flush."; + String COMPACT_HDFS_WRITE_SIZE = "compactHdfsWriteSize"; + String COMPACT_HDFS_WRITE_SIZE_DESC = "Total number of bytes that is write to HDFS by compact."; + String COMPACT_HDFS_READ_SIZE = "compactHdfsReadSize"; + String COMPACT_HDFS_READ_SIZE_DESC = "Total number of bytes that is read from HDFS by compact."; + String HANDLER_HDFS_READ_SIZE = "hanlderHdfsReadSize"; + String HANDLER_HDFS_READ_SIZE_DESC = "Total number of bytes that is read from HDFS by handler."; + String OTHER_HDFS_WRITE_SIZE = "otherHdfsWriteSize"; + String OTHER_HDFS_WRITE_SIZE_DESC = "Total number of bytes that is write to HDFS by other thread."; + String OTHER_HDFS_READ_SIZE = "otherHdfsReadSize"; + String OTHER_HDFS_READ_SIZE_DESC = "Total number of bytes that is read from HDFS by other thread."; } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index 0aa625c..8eccdd9 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -457,4 +457,16 @@ public interface MetricsRegionServerWrapper { long getDeleteFamilyBloomHitCount(); long getTrailerHitCount(); + + long getFlushHdfsWriteSize(); + + long getCompactHdfsWriteSize(); + + long getCompactHdfsReadSize(); + + long getHandlerHdfsReadSize(); + + long getOtherHdfsWriteSize(); + + long getOtherHdfsReadSize(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index b412fd1..4b7219d 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -497,6 +497,19 @@ public class MetricsRegionServerSourceImpl .addGauge(Interns.info(MOB_FILE_CACHE_HIT_PERCENT, MOB_FILE_CACHE_HIT_PERCENT_DESC), rsWrap.getMobFileCacheHitPercent()) + .addGauge(Interns.info(FLUSH_HDFS_WRITE_SIZE, FLUSH_HDFS_WRITE_SIZE_DESC), + rsWrap.getFlushHdfsWriteSize()) + .addGauge(Interns.info(COMPACT_HDFS_WRITE_SIZE, COMPACT_HDFS_WRITE_SIZE_DESC), + rsWrap.getCompactHdfsWriteSize()) + .addGauge(Interns.info(COMPACT_HDFS_READ_SIZE, COMPACT_HDFS_READ_SIZE_DESC), + rsWrap.getCompactHdfsReadSize()) + .addGauge(Interns.info(HANDLER_HDFS_READ_SIZE, HANDLER_HDFS_READ_SIZE_DESC), + rsWrap.getHandlerHdfsReadSize()) + .addGauge(Interns.info(OTHER_HDFS_WRITE_SIZE, OTHER_HDFS_WRITE_SIZE_DESC), + rsWrap.getOtherHdfsWriteSize()) + .addGauge(Interns.info(OTHER_HDFS_READ_SIZE, OTHER_HDFS_READ_SIZE_DESC), + rsWrap.getOtherHdfsReadSize()) + .addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), rsWrap.getHedgedReadOps()) .addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC), rsWrap.getHedgedReadWins()) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 8582dbe..042a352 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -34,11 +34,10 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -49,6 +48,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; @@ -59,9 +59,12 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair; import org.apache.hadoop.hbase.protobuf.generated.HFileProtos; import org.apache.hadoop.hbase.util.BloomFilterWriter; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ThreadUtil; +import org.apache.hadoop.hbase.util.ThreadUtil.ThreadType; import org.apache.hadoop.io.Writable; import com.google.common.annotations.VisibleForTesting; @@ -185,6 +188,72 @@ public class HFile { // For tests. Gets incremented when we read a block whether from HDFS or from Cache. public static final Counter DATABLOCK_READ_COUNT = new Counter(); + public static final AtomicLong FLUSH_HDFS_WRITE_SZIE = new AtomicLong(); + + public static final AtomicLong COMPACT_HDFS_READ_SIZE = new AtomicLong(); + + public static final AtomicLong COMPACT_HDFS_WRITE_SIZE = new AtomicLong(); + + public static final AtomicLong HANDLER_HDFS_READ_SIZE = new AtomicLong(); + + public static final AtomicLong OTHER_HDFS_READ_SIZE = new AtomicLong(); + + public static final AtomicLong OTHER_HDFS_WRITE_SIZE = new AtomicLong(); + + public static void updateWriteSize(int size) { + ThreadType type = ThreadUtil.CurType.get(); + if (type != null) { + if (type == ThreadType.FLUSH) { + FLUSH_HDFS_WRITE_SZIE.addAndGet(size); + } else if (type == ThreadType.COMPACT) { + COMPACT_HDFS_WRITE_SIZE.addAndGet(size); + } else { + OTHER_HDFS_WRITE_SIZE.addAndGet(size); + } + } else { + OTHER_HDFS_WRITE_SIZE.addAndGet(size); + } + } + + public static void updateReadSize(int size) { + ThreadType type = ThreadUtil.CurType.get(); + if (type != null) { + if (type == ThreadType.HANDLER) { + HANDLER_HDFS_READ_SIZE.addAndGet(size); + } else if (type == ThreadType.COMPACT) { + COMPACT_HDFS_READ_SIZE.addAndGet(size); + } else { + OTHER_HDFS_READ_SIZE.addAndGet(size); + } + } else { + OTHER_HDFS_READ_SIZE.addAndGet(size); + } + } + + public static long getFlushHdfsWriteSize() { + return FLUSH_HDFS_WRITE_SZIE.get(); + } + + public static long getCompactHdfsReadSize() { + return COMPACT_HDFS_READ_SIZE.get(); + } + + public static long getCompactHdfsWriteSize() { + return COMPACT_HDFS_WRITE_SIZE.get(); + } + + public static long getHandlerHdfsReadSize() { + return HANDLER_HDFS_READ_SIZE.get(); + } + + public static long getOtherHdfsReadSize() { + return OTHER_HDFS_READ_SIZE.get(); + } + + public static long getOtherHdfsWriteSize() { + return OTHER_HDFS_WRITE_SIZE.get(); + } + /** * Number of checksum verification failures. It also * clears the counter. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 14a5cd1..51e718c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -35,10 +35,10 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.io.ByteBuffInputStream; import org.apache.hadoop.hbase.io.ByteBufferSupportDataOutputStream; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; @@ -1098,6 +1098,8 @@ public class HFileBlock implements Cacheable { ensureBlockReady(); out.write(onDiskBlockBytesWithHeader); out.write(onDiskChecksum); + HFile.updateWriteSize(onDiskBlockBytesWithHeader.length + + onDiskChecksum.length); } /** @@ -1519,7 +1521,7 @@ public class HFileBlock implements Cacheable { return -1; } } - + HFile.updateReadSize(size); assert peekIntoNextBlock; return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java index 4e06f4f..4139706 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java @@ -22,11 +22,12 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ThreadUtil; +import org.apache.hadoop.hbase.util.ThreadUtil.ThreadType; /** * Balanced queue executor with a fastpath. Because this is FIFO, it has no respect for @@ -92,6 +93,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { final Deque fastPathHandlerStack) { super(name, handlerFailureThreshhold, q); this.fastPathHandlerStack = fastPathHandlerStack; + ThreadUtil.CurType.set(ThreadType.HANDLER); } protected CallRunner getCallRunner() throws InterruptedException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 5b6c6c8..7d9656d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.util.ThreadUtil; +import org.apache.hadoop.hbase.util.ThreadUtil.ThreadType; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; @@ -150,6 +152,7 @@ public abstract class RpcExecutor { setDaemon(true); this.q = q; this.handlerFailureThreshhold = handlerFailureThreshhold; + ThreadUtil.CurType.set(ThreadType.HANDLER); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index c1f82b9..2e22df7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.StealJobQueue; +import org.apache.hadoop.hbase.util.ThreadUtil; +import org.apache.hadoop.hbase.util.ThreadUtil.ThreadType; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; @@ -561,6 +563,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi @Override public void run() { + ThreadUtil.CurType.set(ThreadType.COMPACT); Preconditions.checkNotNull(server); if (server.isStopped() || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index a69d8c0..59a7120 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.hbase.util.ThreadUtil; +import org.apache.hadoop.hbase.util.ThreadUtil.ThreadType; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; @@ -232,6 +234,7 @@ class MemStoreFlusher implements FlushRequester { @Override public void run() { + ThreadUtil.CurType.set(ThreadType.FLUSH); while (!server.isStopped()) { FlushQueueEntry fqe = null; try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index d65365e..f76417a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -26,15 +26,16 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheStats; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mob.MobCacheConfig; import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; @@ -105,6 +106,12 @@ class MetricsRegionServerWrapperImpl private volatile long mobFileCacheCount = 0; private volatile long blockedRequestsCount = 0L; private volatile long averageRegionSize = 0L; + private volatile long flushHdfsWriteSize = 0; + private volatile long compactHDFSWriteSize = 0; + private volatile long compactHDFSReadSize = 0L; + private volatile long handlerHdfsReadSize = 0L; + private volatile long otherHDFSWriteSize = 0; + private volatile long otherHDFSReadSize = 0L; private CacheStats cacheStats; private ScheduledExecutorService executor; @@ -815,6 +822,12 @@ class MetricsRegionServerWrapperImpl mobFileCacheEvictedCount = mobFileCache.getEvictedFileCount(); mobFileCacheCount = mobFileCache.getCacheSize(); blockedRequestsCount = tempBlockedRequestsCount; + flushHdfsWriteSize = HFile.getFlushHdfsWriteSize(); + compactHDFSWriteSize = HFile.getCompactHdfsWriteSize(); + compactHDFSReadSize = HFile.getCompactHdfsReadSize(); + handlerHdfsReadSize = HFile.getHandlerHdfsReadSize(); + otherHDFSWriteSize = HFile.getOtherHdfsWriteSize(); + otherHDFSReadSize = HFile.getOtherHdfsReadSize(); } catch (Throwable e) { LOG.warn("Caught exception! Will suppress and retry.", e); } @@ -999,4 +1012,34 @@ class MetricsRegionServerWrapperImpl } return cacheStats.getTrailerHitCount(); } + + @Override + public long getFlushHdfsWriteSize() { + return flushHdfsWriteSize; + } + + @Override + public long getCompactHdfsWriteSize() { + return compactHDFSWriteSize; + } + + @Override + public long getCompactHdfsReadSize() { + return compactHDFSReadSize; + } + + @Override + public long getHandlerHdfsReadSize() { + return handlerHdfsReadSize; + } + + @Override + public long getOtherHdfsWriteSize() { + return otherHDFSWriteSize; + } + + @Override + public long getOtherHdfsReadSize() { + return otherHDFSReadSize; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadUtil.java new file mode 100644 index 0000000..de4910c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ThreadUtil.java @@ -0,0 +1,32 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +public class ThreadUtil { + + public enum ThreadType { + HANDLER(), + FLUSH(), + COMPACT(), + OTHER; + } + + public static final ThreadLocal CurType = new ThreadLocal(); + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index 6e4828c..7cdbde8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -509,4 +509,34 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe public long getAverageRegionSize() { return 10000000; } + + @Override + public long getFlushHdfsWriteSize() { + return 0; + } + + @Override + public long getCompactHdfsWriteSize() { + return 0; + } + + @Override + public long getCompactHdfsReadSize() { + return 0; + } + + @Override + public long getHandlerHdfsReadSize() { + return 0; + } + + @Override + public long getOtherHdfsWriteSize() { + return 0; + } + + @Override + public long getOtherHdfsReadSize() { + return 0; + } }