From 635407965d76ffcebfbecf5de4fb8691511599bc Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 12 Aug 2014 16:12:26 -0700 Subject: [PATCH] HBASE-11702 Better introspection of long running compactions --- .../regionserver/MetricsRegionServerSource.java | 8 +++++- .../regionserver/MetricsRegionServerWrapper.java | 15 ++++++++++ .../MetricsRegionServerSourceImpl.java | 6 ++++ .../apache/hadoop/hbase/regionserver/HStore.java | 32 +++++++++++++++++++++- .../MetricsRegionServerWrapperImpl.java | 27 +++++++++++++++++- .../apache/hadoop/hbase/regionserver/Store.java | 15 ++++++++++ .../compactions/CompactionProgress.java | 6 ++++ .../hbase/regionserver/compactions/Compactor.java | 17 +++++++++++- .../MetricsRegionServerWrapperStub.java | 15 ++++++++++ 9 files changed, 137 insertions(+), 4 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index ac5fb51..eede210 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -217,6 +217,12 @@ public interface MetricsRegionServerSource extends BaseSource { "The number of Increments that took over 1000ms to complete"; String SLOW_APPEND_DESC = "The number of Appends that took over 1000ms to complete"; - + String FLUSHED_CELLS = "flushedCellsCount"; + String FLUSHED_CELLS_DESC = "The number of cells flushed to disk"; + String COMPACTED_CELLS = "compactedCellsCount"; + String COMPACTED_CELLS_DESC = "The number of cells processed during minor compactions"; + String MAJOR_COMPACTED_CELLS = "majorCompactedCellsCount"; + String MAJOR_COMPACTED_CELLS_DESC = + "The number of cells processed during major compactions"; } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index 83b6da1..9e0e181 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -216,4 +216,19 @@ public interface MetricsRegionServerWrapper { * Get the amount of time that updates were blocked. */ long getUpdatesBlockedTime(); + + /** + * Get the number of cells flushed to disk. + */ + long getFlushedCellsCount(); + + /** + * Get the number of cells processed during minor compactions. + */ + long getCompactedCellsCount(); + + /** + * Get the number of cells processed during major compactions. + */ + long getMajorCompactedCellsCount(); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index 1964b42..2a8bfa2 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -207,6 +207,12 @@ public class MetricsRegionServerSourceImpl BLOCK_CACHE_EXPRESS_HIT_PERCENT_DESC), rsWrap.getBlockCacheHitCachingPercent()) .addCounter(Interns.info(UPDATES_BLOCKED_TIME, UPDATES_BLOCKED_DESC), rsWrap.getUpdatesBlockedTime()) + .addCounter(Interns.info(FLUSHED_CELLS, FLUSHED_CELLS_DESC), + rsWrap.getFlushedCellsCount()) + .addCounter(Interns.info(COMPACTED_CELLS, COMPACTED_CELLS_DESC), + rsWrap.getCompactedCellsCount()) + .addCounter(Interns.info(MAJOR_COMPACTED_CELLS, MAJOR_COMPACTED_CELLS_DESC), + rsWrap.getMajorCompactedCellsCount()) .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), rsWrap.getZookeeperQuorum()) .tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName()) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index f09f363..567b25a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -183,6 +183,10 @@ public class HStore implements Store { private Encryption.Context cryptoContext = Encryption.Context.NONE; + private volatile long flushedCellsCount = 0; + private volatile long compactedCellsCount = 0; + private volatile long majorCompactedCellsCount = 0; + /** * Constructor * @param region @@ -1101,6 +1105,11 @@ public class HStore implements Store { sfs = moveCompatedFilesIntoPlace(cr, newFiles); writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); + if (cr.isMajor()) { + majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; + } else { + compactedCellsCount += getCompactionProgress().totalCompactingKVs; + } // At this point the store will use new files for all new scanners. completeCompaction(filesToCompact); // Archive old files & update store size. } finally { @@ -1924,6 +1933,7 @@ public class HStore implements Store { private class StoreFlusherImpl implements StoreFlushContext { private long cacheFlushSeqNum; + private long cacheFlushCount; private SortedSet snapshot; private List tempFiles; private TimeRangeTracker snapshotTimeRangeTracker; @@ -1941,6 +1951,7 @@ public class HStore implements Store { public void prepare() { memstore.snapshot(); this.snapshot = memstore.getSnapshot(); + this.cacheFlushCount = snapshot.size(); this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker(); } @@ -1981,6 +1992,9 @@ public class HStore implements Store { HStore.this.getCoprocessorHost().postFlush(HStore.this, sf); } } + + HStore.this.flushedCellsCount += cacheFlushCount; + // Add new file to store files. Clear snapshot too while we have the Store write lock. return HStore.this.updateStorefiles(storeFiles, snapshot); } @@ -1997,7 +2011,7 @@ public class HStore implements Store { } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) + ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG) + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD @@ -2033,4 +2047,20 @@ public class HStore implements Store { public boolean hasTooManyStoreFiles() { return getStorefilesCount() > this.blockingFileCount; } + + @Override + public long getFlushedCellsCount() { + return flushedCellsCount; + } + + @Override + public long getCompactedCellsCount() { + return compactedCellsCount; + } + + @Override + public long getMajorCompactedCellsCount() { + return majorCompactedCellsCount; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index bda7d0d..48f93cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -66,6 +66,9 @@ class MetricsRegionServerWrapperImpl private volatile long numMutationsWithoutWAL = 0; private volatile long dataInMemoryWithoutWAL = 0; private volatile int percentFileLocal = 0; + private volatile long flushedCellsCount = 0; + private volatile long compactedCellsCount = 0; + private volatile long majorCompactedCellsCount = 0; private CacheStats cacheStats; private ScheduledExecutorService executor; @@ -353,6 +356,20 @@ class MetricsRegionServerWrapperImpl return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().get(); } + @Override + public long getFlushedCellsCount() { + return flushedCellsCount; + } + + @Override + public long getCompactedCellsCount() { + return compactedCellsCount; + } + + @Override + public long getMajorCompactedCellsCount() { + return majorCompactedCellsCount; + } /** * This is the runnable that will be executed on the executor every PERIOD number of seconds @@ -386,7 +403,9 @@ class MetricsRegionServerWrapperImpl long tempNumMutationsWithoutWAL = 0; long tempDataInMemoryWithoutWAL = 0; int tempPercentFileLocal = 0; - + long tempFlushedCellsCount = 0; + long tempCompactedCellsCount = 0; + long tempMajorCompactedCellsCount = 0; for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get(); @@ -403,6 +422,9 @@ class MetricsRegionServerWrapperImpl tempStorefileIndexSize += store.getStorefilesIndexSize(); tempTotalStaticBloomSize += store.getTotalStaticBloomSize(); tempTotalStaticIndexSize += store.getTotalStaticIndexSize(); + tempFlushedCellsCount += store.getFlushedCellsCount(); + tempCompactedCellsCount += store.getCompactedCellsCount(); + tempMajorCompactedCellsCount += store.getMajorCompactedCellsCount(); } hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution()); @@ -459,6 +481,9 @@ class MetricsRegionServerWrapperImpl numMutationsWithoutWAL = tempNumMutationsWithoutWAL; dataInMemoryWithoutWAL = tempDataInMemoryWithoutWAL; percentFileLocal = tempPercentFileLocal; + flushedCellsCount = tempFlushedCellsCount; + compactedCellsCount = tempCompactedCellsCount; + majorCompactedCellsCount = tempMajorCompactedCellsCount; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 8923769..d3cd3fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -342,6 +342,21 @@ public interface Store extends HeapSize, StoreConfigInformation { TableName getTableName(); + /** + * @return The number of cells flushed to disk + */ + long getFlushedCellsCount(); + + /** + * @return The number of cells processed during minor compactions + */ + long getCompactedCellsCount(); + + /** + * @return The number of cells processed during major compactions + */ + long getMajorCompactedCellsCount(); + /* * @param o Observer who wants to know about changes in set of Readers */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java index 67eb622..96d75c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java @@ -66,4 +66,10 @@ public class CompactionProgress { public void complete() { this.totalCompactingKVs = this.currentCompactedKVs; } + + @Override + public String toString() { + return String.format("%d/%d (%.2f%%)", currentCompactedKVs, totalCompactingKVs, + 100 * getProgressPct()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 9e792c4..c076b7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -210,8 +210,13 @@ public abstract class Compactor { // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. List kvs = new ArrayList(); - // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME int closeCheckInterval = HStore.getCloseCheckInterval(); + long lastMillis; + if (LOG.isDebugEnabled()) { + lastMillis = System.currentTimeMillis(); + } else { + lastMillis = 0; + } boolean hasMore; do { hasMore = scanner.next(kvs, compactionKVMax); @@ -228,6 +233,16 @@ public abstract class Compactor { if (closeCheckInterval > 0) { bytesWritten += kv.getLength(); if (bytesWritten > closeCheckInterval) { + // Log the progress of long running compactions every minute if + // logging at DEBUG level + if (LOG.isDebugEnabled()) { + long now = System.currentTimeMillis(); + if ((now - lastMillis) >= 60 * 1000) { + LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec", + (bytesWritten / 1024.0) / ((now - lastMillis) / 1000.0))); + lastMillis = now; + } + } bytesWritten = 0; if (!store.areWritesEnabled()) { progress.cancel(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index 31686f5..856591e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -211,4 +211,19 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe return 1024000; } + @Override + public long getFlushedCellsCount() { + return 100000000; + } + + @Override + public long getCompactedCellsCount() { + return 10000000; + } + + @Override + public long getMajorCompactedCellsCount() { + return 1000000; + } + } \ No newline at end of file -- 1.8.5.2 (Apple Git-48)