diff --git src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 46239e9..8aaf2c3 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -28,10 +28,12 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Preconditions; + import java.io.IOException; import java.lang.management.ManagementFactory; -import java.util.ArrayList; import java.util.ConcurrentModificationException; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.SortedMap; @@ -39,6 +41,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** @@ -56,12 +60,14 @@ class MemStoreFlusher extends Thread implements FlushRequester { // a corresponding entry in the other. private final BlockingQueue flushQueue = new DelayQueue(); - private final Map regionsInQueue = - new HashMap(); + private final Map regionsInQueue = + new HashMap(); + private AtomicBoolean wakeupPending = new AtomicBoolean(); private final long threadWakeFrequency; private final HRegionServer server; private final ReentrantLock lock = new ReentrantLock(); + private final Condition flushOccurred = lock.newCondition(); protected final long globalMemStoreLimit; protected final long globalMemStoreLimitLowMark; @@ -133,17 +139,57 @@ class MemStoreFlusher extends Thread implements FlushRequester { } return (long)(max * limit); } + + private boolean flushOneForGlobalPressure() { + // There is nothing we have been *asked* to flush, but we're above low water mark + // First try to pick a new region not already been queued up. + // (the ones queued up have been re-queued for some reason, eg blocked on + // compactions) + HRegion bestFlushableRegion = getBiggestMemstoreRegion(EnumSet.of( + RegionQualification.NOT_TOOMANYSTORES)); + HRegion bestAnyRegion = getBiggestMemstoreRegion(EnumSet.noneOf(RegionQualification.class)); + + Preconditions.checkState(bestAnyRegion != null, + "Above memory mark but there are no regions!"); + + HRegion regionToFlush; + if (bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) { + LOG.info("Under global heap pressure: " + + "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + + "store files, but is " + + StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) + + " vs best flushable region's " + + StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) + + ". Choosing the bigger."); + regionToFlush = bestAnyRegion; + } else { + regionToFlush = bestFlushableRegion; + } + + Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); + + LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); + return flushRegion(regionToFlush, true); + } @Override public void run() { while (!this.server.isStopped()) { FlushQueueEntry fqe = null; try { + wakeupPending.set(false); // allow someone to wake us up again fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (fqe == null) { + if (fqe == null || fqe instanceof WakeupFlushThread) { + if (isAboveLowWaterMark()) { + LOG.info("Flush thread woke up with memory above low water."); + flushOneForGlobalPressure(); + // Enqueue another one of these tokens so we'll wake up again + wakeupFlushThread(); + } continue; } - if (!flushRegion(fqe)) { + FlushRegionEntry fre = (FlushRegionEntry)fqe; + if (!flushRegion(fre)) { break; } } catch (InterruptedException ex) { @@ -151,9 +197,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { } catch (ConcurrentModificationException ex) { continue; } catch (Exception ex) { - LOG.error("Cache flush failed" + - (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""), - ex); + LOG.error("Cache flusher failed for entry " + fqe); if (!server.checkFileSystem()) { break; } @@ -164,12 +208,54 @@ class MemStoreFlusher extends Thread implements FlushRequester { LOG.info(getName() + " exiting"); } + private void wakeupFlushThread() { + if (wakeupPending.compareAndSet(false, true)) { + flushQueue.add(new WakeupFlushThread()); + } + } + + enum RegionQualification { + NOT_QUEUED, + NOT_TOOMANYSTORES; + }; + + private HRegion getBiggestMemstoreRegion(EnumSet qualifications) { + SortedMap regionsBySize = server.getCopyOfOnlineRegionsSortedBySize(); + synchronized (regionsInQueue) { + for (HRegion region : regionsBySize.values()) { + if (qualifications.contains(RegionQualification.NOT_QUEUED) && + regionsInQueue.containsKey(region)) + continue; + if (qualifications.contains(RegionQualification.NOT_TOOMANYSTORES) && + isTooManyStoreFiles(region)) { + continue; + } + return region; + } + } + return null; + } + + /** + * Return true if global memory usage is above the high watermark + */ + private boolean isAboveHighWaterMark() { + return server.getGlobalMemStoreSize() >= globalMemStoreLimit; + } + + /** + * Return true if we're above the high watermark + */ + private boolean isAboveLowWaterMark() { + return server.getGlobalMemStoreSize() >= globalMemStoreLimitLowMark; + } + public void requestFlush(HRegion r) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush // queue. It'll come out near immediately. - FlushQueueEntry fqe = new FlushQueueEntry(r); + FlushRegionEntry fqe = new FlushRegionEntry(r); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); } @@ -196,7 +282,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { * false, there will be accompanying log messages explaining why the log was * not flushed. */ - private boolean flushRegion(final FlushQueueEntry fqe) { + private boolean flushRegion(final FlushRegionEntry fqe) { HRegion region = fqe.region; if (!fqe.region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { @@ -237,7 +323,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { */ private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { synchronized (this.regionsInQueue) { - FlushQueueEntry fqe = this.regionsInQueue.remove(region); + FlushRegionEntry fqe = this.regionsInQueue.remove(region); if (fqe != null && emergencyFlush) { // Need to remove from region from delay queue. When NOT an // emergencyFlush, then item was removed via a flushQueue.poll. @@ -266,6 +352,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { return false; } } finally { + flushOccurred.signalAll(); lock.unlock(); } return true; @@ -287,49 +374,38 @@ class MemStoreFlusher extends Thread implements FlushRequester { * amount of memstore consumption. */ public synchronized void reclaimMemStoreMemory() { - if (server.getGlobalMemStoreSize() >= globalMemStoreLimit) { - flushSomeRegions(); + if (isAboveHighWaterMark()) { + lock.lock(); + try { + while (isAboveHighWaterMark()) { + wakeupFlushThread(); + flushOccurred.awaitUninterruptibly(); + } + } finally { + lock.unlock(); + } + } else if (isAboveLowWaterMark()) { + wakeupFlushThread(); } } - /* - * Emergency! Need to flush memory. + interface FlushQueueEntry extends Delayed {} + + /** + * Token to insert into the flush queue that ensures that the flusher does not sleep */ - private synchronized void flushSomeRegions() { - // keep flushing until we hit the low water mark - long globalMemStoreSize = -1; - ArrayList regionsToCompact = new ArrayList(); - for (SortedMap m = - this.server.getCopyOfOnlineRegionsSortedBySize(); - (globalMemStoreSize = server.getGlobalMemStoreSize()) >= - this.globalMemStoreLimitLowMark;) { - // flush the region with the biggest memstore - if (m.size() <= 0) { - LOG.info("No online regions to flush though we've been asked flush " + - "some; globalMemStoreSize=" + - StringUtils.humanReadableInt(globalMemStoreSize) + - ", globalMemStoreLimitLowMark=" + - StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark)); - break; - } - HRegion biggestMemStoreRegion = m.remove(m.firstKey()); - LOG.info("Forced flushing of " + biggestMemStoreRegion.toString() + - " because global memstore limit of " + - StringUtils.humanReadableInt(this.globalMemStoreLimit) + - " exceeded; currently " + - StringUtils.humanReadableInt(globalMemStoreSize) + " and flushing till " + - StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark)); - if (!flushRegion(biggestMemStoreRegion, true)) { - LOG.warn("Flush failed"); - break; - } - regionsToCompact.add(biggestMemStoreRegion); + static class WakeupFlushThread implements FlushQueueEntry { + @Override + public long getDelay(TimeUnit unit) { + return 0; } - for (HRegion region : regionsToCompact) { - server.compactSplitThread.requestCompaction(region, getName()); + + @Override + public int compareTo(Delayed o) { + return -1; } } - + /** * Datastructure used in the flush queue. Holds region and retry count. * Keeps tabs on how old this object is. Implements {@link Delayed}. On @@ -338,13 +414,14 @@ class MemStoreFlusher extends Thread implements FlushRequester { * milliseconds before readding to delay queue if you want it to stay there * a while. */ - static class FlushQueueEntry implements Delayed { + static class FlushRegionEntry implements FlushQueueEntry { private final HRegion region; + private final long createTime; private long whenToExpire; private int requeueCount = 0; - FlushQueueEntry(final HRegion r) { + FlushRegionEntry(final HRegion r) { this.region = r; this.createTime = System.currentTimeMillis(); this.whenToExpire = this.createTime; @@ -372,7 +449,7 @@ class MemStoreFlusher extends Thread implements FlushRequester { * to whatever you pass. * @return This. */ - public FlushQueueEntry requeue(final long when) { + public FlushRegionEntry requeue(final long when) { this.whenToExpire = System.currentTimeMillis() + when; this.requeueCount++; return this; @@ -389,5 +466,10 @@ class MemStoreFlusher extends Thread implements FlushRequester { return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)).intValue(); } + + @Override + public String toString() { + return "[flush region " + Bytes.toString(region.getRegionName()) + "]"; + } } }