Index: src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (revision 764692) +++ src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (working copy) @@ -68,7 +68,8 @@ "hbase.regionserver.globalMemcache.upperLimit"; public static final String LOWER_KEY = "hbase.regionserver.globalMemcache.lowerLimit"; - + private final long blockingStoreFilesNumber; + /** * @param conf * @param server @@ -89,6 +90,8 @@ "because supplied " + LOWER_KEY + " was > " + UPPER_KEY); } this.globalMemcacheLimitLowMark = lower; + this.blockingStoreFilesNumber = 1 + + conf.getInt("hbase.hstore.compactionThreshold", 3); LOG.info("globalMemcacheLimit=" + StringUtils.humanReadableInt(this.globalMemcacheLimit) + ", globalMemcacheLimitLowMark=" + @@ -204,19 +207,46 @@ * it may have been determined that the region had a significant amount of * memory in use and needed to be flushed to relieve memory pressure. In this * case, its flush may preempt the pending request in the queue, and if so, - * it needs to be removed from the queue to avoid flushing the region multiple - * times. + * it needs to be removed from the queue to avoid flushing the region + * multiple times. * * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the log was * not flushed. */ private boolean flushRegion(HRegion region, boolean removeFromQueue) { + // Wait until it is safe to flush + int count = 0; + boolean triggered = false; + while (count++ < 120) { // wait up to 1 minute, max + for (Store hstore: region.stores.values()) { + if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { + if (!triggered) { + server.compactSplitThread.compactionRequested(region, getName()); + LOG.info("Too many store files for region " + region + ": " + + hstore.getStorefilesCount() + ", waiting"); + triggered = true; + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // ignore + } + continue; + } + } + if (triggered) { + LOG.info("Compaction completed on region " + region + + ", proceeding"); + } + break; + } synchronized (regionsInQueue) { + // See comment above for removeFromQueue on why we do not // take the region out of the set. If removeFromQueue is true, remove it - // from the queue too if it is there. This didn't used to be a constraint, - // but now that HBASE-512 is in play, we need to try and limit - // double-flushing of regions. + // from the queue too if it is there. This didn't used to be a + // constraint, but now that HBASE-512 is in play, we need to try and + // limit double-flushing of regions. if (regionsInQueue.remove(region) && removeFromQueue) { flushQueue.remove(region); }