commit bc3a6744c00ebd7b0bce1359e027c20e8557fd71 Author: Todd Lipcon Date: Mon Feb 14 14:01:11 2011 -0800 HBASE-3531. When under global memstore pressure, may try to flush unflushable regions in a tight loop diff --git src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 57f4d2c..6596872 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -36,7 +36,9 @@ import java.util.ConcurrentModificationException; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.SortedMap; +import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; @@ -149,36 +151,54 @@ class MemStoreFlusher extends Thread implements FlushRequester { private boolean flushOneForGlobalPressure() { SortedMap regionsBySize = server.getCopyOfOnlineRegionsSortedBySize(); - // Find the biggest region that doesn't have too many storefiles - HRegion bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, true); - // Find the biggest region, total, even if it might have too many flushes. - HRegion bestAnyRegion = getBiggestMemstoreRegion(regionsBySize, false); - Preconditions.checkState(bestAnyRegion != null, - "Above memory mark but there are no regions!"); - - HRegion regionToFlush; - if (bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) { - // Even if it's not supposed to be flushed, pick a region if it's more than twice - // as big as the best flushable one - otherwise when we're under pressure we make - // lots of little flushes and cause lots of compactions, etc, which just makes - // life worse! - LOG.info("Under global heap pressure: " + - "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + - "store files, but is " + - StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) + - " vs best flushable region's " + - StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) + - ". Choosing the bigger."); - regionToFlush = bestAnyRegion; - } else { - regionToFlush = bestFlushableRegion; + // TODO: HBASE-3532 - we can't use Set here because it doesn't + // implement equals correctly. So, set of region names. + Set excludedRegionNames = new TreeSet(Bytes.BYTES_COMPARATOR); + + boolean flushedOne = false; + while (!flushedOne) { + // Find the biggest region that doesn't have too many storefiles + HRegion bestFlushableRegion = getBiggestMemstoreRegion( + regionsBySize, excludedRegionNames, true); + // Find the biggest region, total, even if it might have too many flushes. + HRegion bestAnyRegion = getBiggestMemstoreRegion( + regionsBySize, excludedRegionNames, false); + + if (bestAnyRegion == null) { + LOG.fatal("Above memory mark but there are no flushable regions!"); + return false; + } + + HRegion regionToFlush; + if (bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) { + // Even if it's not supposed to be flushed, pick a region if it's more than twice + // as big as the best flushable one - otherwise when we're under pressure we make + // lots of little flushes and cause lots of compactions, etc, which just makes + // life worse! + LOG.info("Under global heap pressure: " + + "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + + "store files, but is " + + StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) + + " vs best flushable region's " + + StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) + + ". Choosing the bigger."); + regionToFlush = bestAnyRegion; + } else { + regionToFlush = bestFlushableRegion; + } + + Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); + + LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); + flushedOne = flushRegion(regionToFlush, true); + if (!flushedOne) { + LOG.info("Excluding unflushable region " + regionToFlush + + " - trying to find a different region to flush."); + excludedRegionNames.add(regionToFlush.getRegionName()); + } } - - Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); - - LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); - return flushRegion(regionToFlush, true); + return true; } @Override @@ -191,7 +211,20 @@ class MemStoreFlusher extends Thread implements FlushRequester { if (fqe == null || fqe instanceof WakeupFlushThread) { if (isAboveLowWaterMark()) { LOG.info("Flush thread woke up with memory above low water."); - flushOneForGlobalPressure(); + if (!flushOneForGlobalPressure()) { + // Wasn't able to flush any region, but we're above low water mark + // This is unlikely to happen, but might happen when closing the + // entire server - another thread is flushing regions. We'll just + // sleep a little bit to avoid spinning, and then pretend that + // we flushed one, so anyone blocked will check again + lock.lock(); + try { + Thread.sleep(1000); + flushOccurred.signalAll(); + } finally { + lock.unlock(); + } + } // Enqueue another one of these tokens so we'll wake up again wakeupFlushThread(); } @@ -214,6 +247,14 @@ class MemStoreFlusher extends Thread implements FlushRequester { } this.regionsInQueue.clear(); this.flushQueue.clear(); + + // Signal anyone waiting, so they see the close flag + lock.lock(); + try { + flushOccurred.signalAll(); + } finally { + lock.unlock(); + } LOG.info(getName() + " exiting"); } @@ -225,9 +266,14 @@ class MemStoreFlusher extends Thread implements FlushRequester { private HRegion getBiggestMemstoreRegion( SortedMap regionsBySize, + Set excludedRegionNames, boolean checkStoreFileCount) { synchronized (regionsInQueue) { for (HRegion region : regionsBySize.values()) { + if (excludedRegionNames.contains(region.getRegionName())) { + continue; + } + if (checkStoreFileCount && isTooManyStoreFiles(region)) { continue; } @@ -382,9 +428,15 @@ class MemStoreFlusher extends Thread implements FlushRequester { if (isAboveHighWaterMark()) { lock.lock(); try { - while (isAboveHighWaterMark()) { + while (isAboveHighWaterMark() && !server.isStopped()) { wakeupFlushThread(); - flushOccurred.awaitUninterruptibly(); + try { + // we should be able to wait forever, but we've seen a bug where + // we miss a notify, so put a 5 second bound on it at least. + flushOccurred.await(5, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } } } finally { lock.unlock();