From 6d28186af54b262ae4725bafcdc8917b15724939 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Wed, 13 Aug 2014 14:45:54 -0700 Subject: [PATCH] HBASE-11702 Better introspection of long running compactions --- .../regionserver/MetricsRegionServerSource.java | 15 ++++++ .../regionserver/MetricsRegionServerWrapper.java | 30 ++++++++++++ .../MetricsRegionServerSourceImpl.java | 9 ++++ .../MetricsRegionServerSourceImpl.java | 12 +++++ .../apache/hadoop/hbase/regionserver/HStore.java | 52 ++++++++++++++++++++- .../MetricsRegionServerWrapperImpl.java | 54 +++++++++++++++++++++- .../apache/hadoop/hbase/regionserver/Store.java | 30 ++++++++++++ .../compactions/CompactionProgress.java | 29 ++++++++++++ .../hbase/regionserver/compactions/Compactor.java | 18 +++++++- .../MetricsRegionServerWrapperStub.java | 30 ++++++++++++ 10 files changed, 276 insertions(+), 3 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java index ac5fb51..b471c95 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java @@ -218,5 +218,20 @@ public interface MetricsRegionServerSource extends BaseSource { String SLOW_APPEND_DESC = "The number of Appends that took over 1000ms to complete"; + String FLUSHED_CELLS = "flushedCellsCount"; + String FLUSHED_CELLS_DESC = "The number of cells flushed to disk"; + String FLUSHED_CELLS_SIZE = "flushedCellsSize"; + String FLUSHED_CELLS_SIZE_DESC = "The total amount of data flushed to disk, in bytes"; + String COMPACTED_CELLS = "compactedCellsCount"; + String COMPACTED_CELLS_DESC = "The number of cells processed during minor compactions"; + String COMPACTED_CELLS_SIZE = "compactedCellsSize"; + String COMPACTED_CELLS_SIZE_DESC = + "The total amount of data processed during minor compactions, in bytes"; + String MAJOR_COMPACTED_CELLS = "majorCompactedCellsCount"; + String MAJOR_COMPACTED_CELLS_DESC = + "The number of cells processed during major compactions"; + String MAJOR_COMPACTED_CELLS_SIZE = "majorCompactedCellsSize"; + String MAJOR_COMPACTED_CELLS_SIZE_DESC = + "The total amount of data processed during major compactions, in bytes"; } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java index 83b6da1..e95213c 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java @@ -216,4 +216,34 @@ public interface MetricsRegionServerWrapper { * Get the amount of time that updates were blocked. */ long getUpdatesBlockedTime(); + + /** + * Get the number of cells flushed to disk. + */ + long getFlushedCellsCount(); + + /** + * Get the number of cells processed during minor compactions. + */ + long getCompactedCellsCount(); + + /** + * Get the number of cells processed during major compactions. + */ + long getMajorCompactedCellsCount(); + + /** + * Get the total amount of data flushed to disk, in bytes. + */ + long getFlushedCellsSize(); + + /** + * Get the total amount of data processed during minor compactions, in bytes. + */ + long getCompactedCellsSize(); + + /** + * Get the total amount of data processed during major compactions, in bytes. + */ + long getMajorCompactedCellsSize(); } diff --git a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index db66750..be67541 100644 --- a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -202,6 +202,15 @@ public class MetricsRegionServerSourceImpl BLOCK_CACHE_EXPRESS_HIT_PERCENT_DESC, rsWrap.getBlockCacheHitCachingPercent()) .addCounter(UPDATES_BLOCKED_TIME, UPDATES_BLOCKED_DESC, rsWrap.getUpdatesBlockedTime()) + .addCounter(FLUSHED_CELLS, FLUSHED_CELLS_DESC, rsWrap.getFlushedCellsCount()) + .addCounter(COMPACTED_CELLS, COMPACTED_CELLS_DESC, rsWrap.getCompactedCellsCount()) + .addCounter(MAJOR_COMPACTED_CELLS, MAJOR_COMPACTED_CELLS_DESC, + rsWrap.getMajorCompactedCellsCount()) + .addCounter(FLUSHED_CELLS_SIZE, FLUSHED_CELLS_SIZE_DESC, rsWrap.getFlushedCellsSize()) + .addCounter(COMPACTED_CELLS_SIZE, COMPACTED_CELLS_SIZE_DESC, + rsWrap.getCompactedCellsSize()) + .addCounter(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC, + rsWrap.getMajorCompactedCellsSize()) .tag(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC, rsWrap.getZookeeperQuorum()) .tag(SERVER_NAME_NAME, SERVER_NAME_DESC, rsWrap.getServerName()) .tag(CLUSTER_ID_NAME, CLUSTER_ID_DESC, rsWrap.getClusterId()); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index 1964b42..365d2b2 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -207,6 +207,18 @@ public class MetricsRegionServerSourceImpl BLOCK_CACHE_EXPRESS_HIT_PERCENT_DESC), rsWrap.getBlockCacheHitCachingPercent()) .addCounter(Interns.info(UPDATES_BLOCKED_TIME, UPDATES_BLOCKED_DESC), rsWrap.getUpdatesBlockedTime()) + .addCounter(Interns.info(FLUSHED_CELLS, FLUSHED_CELLS_DESC), + rsWrap.getFlushedCellsCount()) + .addCounter(Interns.info(COMPACTED_CELLS, COMPACTED_CELLS_DESC), + rsWrap.getCompactedCellsCount()) + .addCounter(Interns.info(MAJOR_COMPACTED_CELLS, MAJOR_COMPACTED_CELLS_DESC), + rsWrap.getMajorCompactedCellsCount()) + .addCounter(Interns.info(FLUSHED_CELLS_SIZE, FLUSHED_CELLS_SIZE_DESC), + rsWrap.getFlushedCellsSize()) + .addCounter(Interns.info(COMPACTED_CELLS_SIZE, COMPACTED_CELLS_SIZE_DESC), + rsWrap.getCompactedCellsSize()) + .addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC), + rsWrap.getMajorCompactedCellsSize()) .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC), rsWrap.getZookeeperQuorum()) .tag(Interns.info(SERVER_NAME_NAME, SERVER_NAME_DESC), rsWrap.getServerName()) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index f09f363..7cb5dc2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -183,6 +183,13 @@ public class HStore implements Store { private Encryption.Context cryptoContext = Encryption.Context.NONE; + private volatile long flushedCellsCount = 0; + private volatile long compactedCellsCount = 0; + private volatile long majorCompactedCellsCount = 0; + private volatile long flushedCellsSize = 0; + private volatile long compactedCellsSize = 0; + private volatile long majorCompactedCellsSize = 0; + /** * Constructor * @param region @@ -1101,6 +1108,13 @@ public class HStore implements Store { sfs = moveCompatedFilesIntoPlace(cr, newFiles); writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); + if (cr.isMajor()) { + majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; + majorCompactedCellsSize += getCompactionProgress().totalCompactedSize; + } else { + compactedCellsCount += getCompactionProgress().totalCompactingKVs; + compactedCellsSize += getCompactionProgress().totalCompactedSize; + } // At this point the store will use new files for all new scanners. completeCompaction(filesToCompact); // Archive old files & update store size. } finally { @@ -1927,6 +1941,7 @@ public class HStore implements Store { private SortedSet snapshot; private List tempFiles; private TimeRangeTracker snapshotTimeRangeTracker; + private long flushedCount; private final AtomicLong flushedSize = new AtomicLong(); private StoreFlusherImpl(long cacheFlushSeqNum) { @@ -1942,6 +1957,7 @@ public class HStore implements Store { memstore.snapshot(); this.snapshot = memstore.getSnapshot(); this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker(); + this.flushedCount = this.snapshot.size(); } @Override @@ -1981,6 +1997,10 @@ public class HStore implements Store { HStore.this.getCoprocessorHost().postFlush(HStore.this, sf); } } + + HStore.this.flushedCellsCount += flushedCount; + HStore.this.flushedCellsSize += flushedSize.get(); + // Add new file to store files. Clear snapshot too while we have the Store write lock. return HStore.this.updateStorefiles(storeFiles, snapshot); } @@ -1997,7 +2017,7 @@ public class HStore implements Store { } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) + ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG) + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD @@ -2033,4 +2053,34 @@ public class HStore implements Store { public boolean hasTooManyStoreFiles() { return getStorefilesCount() > this.blockingFileCount; } + + @Override + public long getFlushedCellsCount() { + return flushedCellsCount; + } + + @Override + public long getFlushedCellsSize() { + return flushedCellsSize; + } + + @Override + public long getCompactedCellsCount() { + return compactedCellsCount; + } + + @Override + public long getCompactedCellsSize() { + return compactedCellsSize; + } + + @Override + public long getMajorCompactedCellsCount() { + return majorCompactedCellsCount; + } + + @Override + public long getMajorCompactedCellsSize() { + return majorCompactedCellsSize; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index bda7d0d..8699b89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -66,6 +66,12 @@ class MetricsRegionServerWrapperImpl private volatile long numMutationsWithoutWAL = 0; private volatile long dataInMemoryWithoutWAL = 0; private volatile int percentFileLocal = 0; + private volatile long flushedCellsCount = 0; + private volatile long compactedCellsCount = 0; + private volatile long majorCompactedCellsCount = 0; + private volatile long flushedCellsSize = 0; + private volatile long compactedCellsSize = 0; + private volatile long majorCompactedCellsSize = 0; private CacheStats cacheStats; private ScheduledExecutorService executor; @@ -353,6 +359,35 @@ class MetricsRegionServerWrapperImpl return this.regionServer.cacheFlusher.getUpdatesBlockedMsHighWater().get(); } + @Override + public long getFlushedCellsCount() { + return flushedCellsCount; + } + + @Override + public long getCompactedCellsCount() { + return compactedCellsCount; + } + + @Override + public long getMajorCompactedCellsCount() { + return majorCompactedCellsCount; + } + + @Override + public long getFlushedCellsSize() { + return flushedCellsSize; + } + + @Override + public long getCompactedCellsSize() { + return compactedCellsSize; + } + + @Override + public long getMajorCompactedCellsSize() { + return majorCompactedCellsSize; + } /** * This is the runnable that will be executed on the executor every PERIOD number of seconds @@ -386,7 +421,12 @@ class MetricsRegionServerWrapperImpl long tempNumMutationsWithoutWAL = 0; long tempDataInMemoryWithoutWAL = 0; int tempPercentFileLocal = 0; - + long tempFlushedCellsCount = 0; + long tempCompactedCellsCount = 0; + long tempMajorCompactedCellsCount = 0; + long tempFlushedCellsSize = 0; + long tempCompactedCellsSize = 0; + long tempMajorCompactedCellsSize = 0; for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { tempNumMutationsWithoutWAL += r.numMutationsWithoutWAL.get(); @@ -403,6 +443,12 @@ class MetricsRegionServerWrapperImpl tempStorefileIndexSize += store.getStorefilesIndexSize(); tempTotalStaticBloomSize += store.getTotalStaticBloomSize(); tempTotalStaticIndexSize += store.getTotalStaticIndexSize(); + tempFlushedCellsCount += store.getFlushedCellsCount(); + tempCompactedCellsCount += store.getCompactedCellsCount(); + tempMajorCompactedCellsCount += store.getMajorCompactedCellsCount(); + tempFlushedCellsSize += store.getFlushedCellsSize(); + tempCompactedCellsSize += store.getCompactedCellsSize(); + tempMajorCompactedCellsSize += store.getMajorCompactedCellsSize(); } hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution()); @@ -459,6 +505,12 @@ class MetricsRegionServerWrapperImpl numMutationsWithoutWAL = tempNumMutationsWithoutWAL; dataInMemoryWithoutWAL = tempDataInMemoryWithoutWAL; percentFileLocal = tempPercentFileLocal; + flushedCellsCount = tempFlushedCellsCount; + compactedCellsCount = tempCompactedCellsCount; + majorCompactedCellsCount = tempMajorCompactedCellsCount; + flushedCellsSize = tempFlushedCellsSize; + compactedCellsSize = tempCompactedCellsSize; + majorCompactedCellsSize = tempMajorCompactedCellsSize; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 8923769..896cd00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -342,6 +342,36 @@ public interface Store extends HeapSize, StoreConfigInformation { TableName getTableName(); + /** + * @return The number of cells flushed to disk + */ + long getFlushedCellsCount(); + + /** + * @return The total size of data flushed to disk, in bytes + */ + long getFlushedCellsSize(); + + /** + * @return The number of cells processed during minor compactions + */ + long getCompactedCellsCount(); + + /** + * @return The total amount of data processed during minor compactions, in bytes + */ + long getCompactedCellsSize(); + + /** + * @return The number of cells processed during major compactions + */ + long getMajorCompactedCellsCount(); + + /** + * @return The total amount of data processed during major compactions, in bytes + */ + long getMajorCompactedCellsSize(); + /* * @param o Observer who wants to know about changes in set of Readers */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java index 67eb622..d9d74ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java @@ -37,6 +37,8 @@ public class CompactionProgress { public long totalCompactingKVs; /** the completed count of key values in currently running compaction */ public long currentCompactedKVs = 0; + /** the total size of data processed by the currently running compaction, in bytes */ + public long totalCompactedSize = 0; /** Constructor * @param totalCompactingKVs the total Key/Value pairs to be compacted @@ -66,4 +68,31 @@ public class CompactionProgress { public void complete() { this.totalCompactingKVs = this.currentCompactedKVs; } + + /** + * @return the total compacting key values in currently running compaction + */ + public long getTotalCompactingKvs() { + return totalCompactingKVs; + } + + /** + * @return the completed count of key values in currently running compaction + */ + public long getCurrentCompactedKvs() { + return currentCompactedKVs; + } + + /** + * @return the total data size processed by the currently running compaction, in bytes + */ + public long getTotalCompactedSize() { + return totalCompactedSize; + } + + @Override + public String toString() { + return String.format("%d/%d (%.2f%%)", currentCompactedKVs, totalCompactingKVs, + 100 * getProgressPct()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 9e792c4..d161478 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -210,8 +210,13 @@ public abstract class Compactor { // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. List kvs = new ArrayList(); - // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME int closeCheckInterval = HStore.getCloseCheckInterval(); + long lastMillis; + if (LOG.isDebugEnabled()) { + lastMillis = System.currentTimeMillis(); + } else { + lastMillis = 0; + } boolean hasMore; do { hasMore = scanner.next(kvs, compactionKVMax); @@ -223,11 +228,22 @@ public abstract class Compactor { } writer.append(kv); ++progress.currentCompactedKVs; + progress.totalCompactedSize += kv.getLength(); // check periodically to see if a system stop is requested if (closeCheckInterval > 0) { bytesWritten += kv.getLength(); if (bytesWritten > closeCheckInterval) { + // Log the progress of long running compactions every minute if + // logging at DEBUG level + if (LOG.isDebugEnabled()) { + long now = System.currentTimeMillis(); + if ((now - lastMillis) >= 60 * 1000) { + LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec", + (bytesWritten / 1024.0) / ((now - lastMillis) / 1000.0))); + lastMillis = now; + } + } bytesWritten = 0; if (!store.areWritesEnabled()) { progress.cancel(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java index 31686f5..3763355 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java @@ -211,4 +211,34 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe return 1024000; } + @Override + public long getFlushedCellsCount() { + return 100000000; + } + + @Override + public long getCompactedCellsCount() { + return 10000000; + } + + @Override + public long getMajorCompactedCellsCount() { + return 1000000; + } + + @Override + public long getFlushedCellsSize() { + return 1024000000; + } + + @Override + public long getCompactedCellsSize() { + return 102400000; + } + + @Override + public long getMajorCompactedCellsSize() { + return 10240000; + } + } \ No newline at end of file -- 1.8.5.2 (Apple Git-48)