From 94750d020f8c65e1de9d1540e3c7b1d15fbfd817 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 12 Aug 2014 16:12:26 -0700 Subject: [PATCH] HBASE-11702 Better introspection of long running compactions --- .../regionserver/MetricsRegionServerSource.java | 8 +++++- .../regionserver/MetricsRegionServerWrapper.java | 15 ++++++++++ .../MetricsRegionServerSourceImpl.java | 6 ++++ .../apache/hadoop/hbase/regionserver/HStore.java | 32 +++++++++++++++++++++- .../MetricsRegionServerWrapperImpl.java | 27 +++++++++++++++++- .../apache/hadoop/hbase/regionserver/Store.java | 15 ++++++++++ .../compactions/CompactionProgress.java | 6 ++++ .../hbase/regionserver/compactions/Compactor.java | 17 +++++++++++- .../MetricsRegionServerWrapperStub.java | 15 ++++++++++ 9 files changed, 137 insertions(+), 4 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index cbe5854..fb443b3c 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -217,6 +217,12 @@ public interface MetricsRegionServerSource extends BaseSource { "The number of Increments that took over 1000ms to complete"; String SLOW_APPEND_DESC = "The number of Appends that took over 1000ms to complete"; - + String FLUSHED_CELLS = "flushedCellsCount"; + String FLUSHED_CELLS_DESC = "The number of cells flushed to disk"; + String COMPACTED_CELLS = "compactedCellsCount"; + String COMPACTED_CELLS_DESC = "The number of cells processed during minor compactions"; + String MAJOR_COMPACTED_CELLS = "majorCompactedCellsCount"; + String MAJOR_COMPACTED_CELLS_DESC = + "The number of cells processed during major compactions"; } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index 057c48d..84648c2 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -216,4 +216,19 @@ public interface MetricsRegionServerWrapper { * Get the amount of time that updates were blocked. */ long getUpdatesBlockedTime(); + + /** + * Get the number of cells flushed to disk. + */ + long getFlushedCellsCount(); + + /** + * Get the number of cells processed during minor compactions. + */ + long getCompactedCellsCount(); + + /** + * Get the number of cells processed during major compactions. + */ + long getMajorCompactedCellsCount(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index 1964b42..2a8bfa2 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -207,6 +207,12 @@ public class MetricsRegionServerSourceImpl BLOCK_CACHE_EXPRESS_HIT_PERCENT_DESC), rsWrap.getBlockCacheHitCachingPercent()) .addCounter(Interns.info(UPDATES_BLOCKED_TIME, UPDATES_BLOCKED_DESC), rsWrap.getUpdatesBlockedTime()) + .addCounter(Interns.info(FLUSHED_CELLS, FLUSHED_CELLS_DESC), + rsWrap.getFlushedCellsCount()) + .addCounter(Interns.info(COMPACTED_CELLS, COMPACTED_CELLS_DESC), + rsWrap.getCompactedCellsCount()) + .addCounter(Interns.info(MAJOR_COMPACTED_CELLS, MAJOR_COMPACTED_CELLS_DESC), + rsWrap.getMajorCompactedCellsCount()) .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), rsWrap.getZookeeperQuorum()) .tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName()) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c783d14..96d5fcf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -189,6 +189,10 @@ public class HStore implements Store { private Encryption.Context cryptoContext = Encryption.Context.NONE; + private volatile long flushedCellsCount = 0; + private volatile long compactedCellsCount = 0; + private volatile long majorCompactedCellsCount = 0; + /** * Constructor * @param region @@ -1157,6 +1161,11 @@ public class HStore implements Store { sfs = moveCompatedFilesIntoPlace(cr, newFiles); writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); + if (cr.isMajor()) { + majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; + } else { + compactedCellsCount += getCompactionProgress().totalCompactingKVs; + } // At this point the store will use new files for all new scanners. completeCompaction(filesToCompact, true); // Archive old files & update store size. } finally { @@ -2037,6 +2046,7 @@ public class HStore implements Store { private MemStoreSnapshot snapshot; private List tempFiles; private List committedFiles; + private long cacheFlushCount; private StoreFlusherImpl(long cacheFlushSeqNum) { this.cacheFlushSeqNum = cacheFlushSeqNum; @@ -2049,6 +2059,7 @@ public class HStore implements Store { @Override public void prepare() { this.snapshot = memstore.snapshot(); + this.cacheFlushCount = snapshot.getCellsCount(); committedFiles = new ArrayList(1); } @@ -2088,6 +2099,9 @@ public class HStore implements Store { } committedFiles.add(sf.getPath()); } + + HStore.this.flushedCellsCount += cacheFlushCount; + // Add new file to store files. Clear snapshot too while we have the Store write lock. return HStore.this.updateStorefiles(storeFiles, snapshot.getId()); } @@ -2109,7 +2123,7 @@ public class HStore implements Store { } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) + ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG) + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD @@ -2145,4 +2159,20 @@ public class HStore implements Store { public boolean hasTooManyStoreFiles() { return getStorefilesCount() > this.blockingFileCount; } + + @Override + public long getFlushedCellsCount() { + return flushedCellsCount; + } + + @Override + public long getCompactedCellsCount() { + return compactedCellsCount; + } + + @Override + public long getMajorCompactedCellsCount() { + return majorCompactedCellsCount; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 3654835..3c012f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -66,6 +66,9 @@ class MetricsRegionServerWrapperImpl private volatile long numMutationsWithoutWAL = 0; private volatile long dataInMemoryWithoutWAL = 0; private volatile int percentFileLocal = 0; + private volatile long flushedCellsCount = 0; + private volatile long compactedCellsCount = 0; + private volatile long majorCompactedCellsCount = 0; private CacheStats cacheStats; private ScheduledExecutorService executor; @@ -353,6 +356,20 @@ class MetricsRegionServerWrapperImpl return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().get(); } + @Override + public long getFlushedCellsCount() { + return flushedCellsCount; + } + + @Override + public long getCompactedCellsCount() { + return compactedCellsCount; + } + + @Override + public long getMajorCompactedCellsCount() { + return majorCompactedCellsCount; + } /** * This is the runnable that will be executed on the executor every PERIOD number of seconds @@ -386,7 +403,9 @@ class MetricsRegionServerWrapperImpl long tempNumMutationsWithoutWAL = 0; long tempDataInMemoryWithoutWAL = 0; int tempPercentFileLocal = 0; - + long tempFlushedCellsCount = 0; + long tempCompactedCellsCount = 0; + long tempMajorCompactedCellsCount = 0; for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get(); @@ -403,6 +422,9 @@ class MetricsRegionServerWrapperImpl tempStorefileIndexSize += store.getStorefilesIndexSize(); tempTotalStaticBloomSize += store.getTotalStaticBloomSize(); tempTotalStaticIndexSize += store.getTotalStaticIndexSize(); + tempFlushedCellsCount += store.getFlushedCellsCount(); + tempCompactedCellsCount += store.getCompactedCellsCount(); + tempMajorCompactedCellsCount += store.getMajorCompactedCellsCount(); } hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution()); @@ -459,6 +481,9 @@ class MetricsRegionServerWrapperImpl numMutationsWithoutWAL = tempNumMutationsWithoutWAL; dataInMemoryWithoutWAL = tempDataInMemoryWithoutWAL; percentFileLocal = tempPercentFileLocal; + flushedCellsCount = tempFlushedCellsCount; + compactedCellsCount = tempCompactedCellsCount; + majorCompactedCellsCount = tempMajorCompactedCellsCount; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 280a1b8..57bbda8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -343,6 +343,21 @@ public interface Store extends HeapSize, StoreConfigInformation { TableName getTableName(); + /** + * @return The number of cells flushed to disk + */ + long getFlushedCellsCount(); + + /** + * @return The number of cells processed during minor compactions + */ + long getCompactedCellsCount(); + + /** + * @return The number of cells processed during major compactions + */ + long getMajorCompactedCellsCount(); + /* * @param o Observer who wants to know about changes in set of Readers */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java index 67eb622..96d75c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java @@ -66,4 +66,10 @@ public class CompactionProgress { public void complete() { this.totalCompactingKVs = this.currentCompactedKVs; } + + @Override + public String toString() { + return String.format("%d/%d (%.2f%%)", currentCompactedKVs, totalCompactingKVs, + 100 * getProgressPct()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index a1d629a..8582d98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -227,8 +227,13 @@ public abstract class Compactor { // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. List kvs = new ArrayList(); - // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME int closeCheckInterval = HStore.getCloseCheckInterval(); + long lastMillis; + if (LOG.isDebugEnabled()) { + lastMillis = System.currentTimeMillis(); + } else { + lastMillis = 0; + } boolean hasMore; do { hasMore = scanner.next(kvs, compactionKVMax); @@ -245,6 +250,16 @@ public abstract class Compactor { if (closeCheckInterval > 0) { bytesWritten += kv.getLength(); if (bytesWritten > closeCheckInterval) { + // Log the progress of long running compactions every minute if + // logging at DEBUG level + if (LOG.isDebugEnabled()) { + long now = System.currentTimeMillis(); + if ((now - lastMillis) >= 60 * 1000) { + LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec", + (bytesWritten / 1024.0) / ((now - lastMillis) / 1000.0))); + lastMillis = now; + } + } bytesWritten = 0; if (!store.areWritesEnabled()) { progress.cancel(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index 875fe35..7a97361 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -211,4 +211,19 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe return 1024000; } + @Override + public long getFlushedCellsCount() { + return 100000000; + } + + @Override + public long getCompactedCellsCount() { + return 10000000; + } + + @Override + public long getMajorCompactedCellsCount() { + return 1000000; + } + } \ No newline at end of file -- 1.8.5.2 (Apple Git-48)