Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1366266) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -32,8 +33,7 @@ import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; @@ -58,7 +59,7 @@ * @see FlushRequester */ @InterfaceAudience.Private -class MemStoreFlusher extends HasThread implements FlushRequester { +class MemStoreFlusher implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); // These two data members go together. Any entry in the one must have // a corresponding entry in the other. @@ -70,8 +71,8 @@ private final long threadWakeFrequency; private final HRegionServer server; - private final ReentrantLock lock = new ReentrantLock(); - private final Condition flushOccurred = lock.newCondition(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Object blockSignal = new Object(); protected final long globalMemStoreLimit; protected final long globalMemStoreLimitLowMark; @@ -85,6 +86,9 @@ private long blockingStoreFilesNumber; private long blockingWaitTime; + private FlushHandler[] flushHandlers = null; + private int handlerCount; + /** * @param conf * @param server @@ -109,6 +113,7 @@ conf.getInt("hbase.hstore.blockingStoreFiles", 7); this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); + this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1); LOG.info("globalMemStoreLimit=" + StringUtils.humanReadableInt(this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + @@ -207,64 +212,69 @@ return true; } - @Override - public void run() { - while (!this.server.isStopped()) { - FlushQueueEntry fqe = null; - try { - wakeupPending.set(false); // allow someone to wake us up again - fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (fqe == null || fqe instanceof WakeupFlushThread) { - if (isAboveLowWaterMark()) { - LOG.debug("Flush thread woke up because memory above low water=" + - StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark)); - if (!flushOneForGlobalPressure()) { - // Wasn't able to flush any region, but we're above low water mark - // This is unlikely to happen, but might happen when closing the - // entire server - another thread is flushing regions. We'll just - // sleep a little bit to avoid spinning, and then pretend that - // we flushed one, so anyone blocked will check again - lock.lock(); - try { + private class FlushHandler extends HasThread { + + private final int instanceNumber; + + FlushHandler(int instanceNumber) { + this.instanceNumber = instanceNumber; + this.setDaemon(true); + String threadName = "'MemStore Flush Handler " + this.instanceNumber; + this.setName(threadName); + } + + @Override + public void run() { + while (!server.isStopped()) { + FlushQueueEntry fqe = null; + try { + wakeupPending.set(false); // allow someone to wake us up again + fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + if (fqe == null || fqe instanceof WakeupFlushThread) { + if (isAboveLowWaterMark()) { + LOG.debug("Flush thread woke up because memory above low water=" + + StringUtils.humanReadableInt(globalMemStoreLimitLowMark)); + if (!flushOneForGlobalPressure()) { + // Wasn't able to flush any region, but we're above low water mark + // This is unlikely to happen, but might happen when closing the + // entire server - another thread is flushing regions. We'll just + // sleep a little bit to avoid spinning, and then pretend that + // we flushed one, so anyone blocked will check again Thread.sleep(1000); - flushOccurred.signalAll(); - } finally { - lock.unlock(); + wakeUpIfBlocking(); } + // Enqueue another one of these tokens so we'll wake up again + wakeupFlushThread(); } - // Enqueue another one of these tokens so we'll wake up again - wakeupFlushThread(); + continue; } + FlushRegionEntry fre = (FlushRegionEntry) fqe; + if (!flushRegion(fre)) { + break; + } + } catch (InterruptedException ex) { continue; + } catch (ConcurrentModificationException ex) { + continue; + } catch (Exception ex) { + LOG.error("Cache flusher failed for entry " + fqe, ex); + if (!server.checkFileSystem()) { + break; + } } - FlushRegionEntry fre = (FlushRegionEntry)fqe; - if (!flushRegion(fre)) { - break; - } - } catch (InterruptedException ex) { - continue; - } catch (ConcurrentModificationException ex) { - continue; - } catch (Exception ex) { - LOG.error("Cache flusher failed for entry " + fqe, ex); - if (!server.checkFileSystem()) { - break; - } } - } - this.regionsInQueue.clear(); - this.flushQueue.clear(); + synchronized (regionsInQueue) { + regionsInQueue.clear(); + flushQueue.clear(); + } - // Signal anyone waiting, so they see the close flag - lock.lock(); - try { - flushOccurred.signalAll(); - } finally { - lock.unlock(); + // Signal anyone waiting, so they see the close flag + wakeUpIfBlocking(); + LOG.info(getName() + " exiting"); } - LOG.info(getName() + " exiting"); } + private void wakeupFlushThread() { if (wakeupPending.compareAndSet(false, true)) { flushQueue.add(new WakeupFlushThread()); @@ -326,14 +336,44 @@ * Only interrupt once it's done with a run through the work loop. */ void interruptIfNecessary() { - lock.lock(); + lock.writeLock().lock(); try { - this.interrupt(); + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null) flushHander.interrupt(); + } } finally { - lock.unlock(); + lock.writeLock().unlock(); } } + synchronized void start(UncaughtExceptionHandler eh) { + flushHandlers = new FlushHandler[handlerCount]; + for (int i = 0; i < flushHandlers.length; i++) { + flushHandlers[i] = new FlushHandler(i); + if (eh != null) { + flushHandlers[i].setUncaughtExceptionHandler(eh); + } + flushHandlers[i].start(); + } + } + + boolean isAlive() { + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null && flushHander.isAlive()) { + return true; + } + } + return false; + } + + void join() { + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null) { + Threads.shutdown(flushHander.getThread()); + } + } + } + /* * A flushRegion that checks store file count. If too many, puts the flush * on delay queue to retry later. @@ -359,7 +399,8 @@ "store files; delaying flush up to " + this.blockingWaitTime + "ms"); if (!this.server.compactSplitThread.requestSplit(region)) { try { - this.server.compactSplitThread.requestCompaction(region, getName()); + this.server.compactSplitThread.requestCompaction(region, Thread + .currentThread().getName()); } catch (IOException e) { LOG.error("Cache flush failed" + (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""), @@ -398,8 +439,8 @@ // emergencyFlush, then item was removed via a flushQueue.poll. flushQueue.remove(fqe); } - lock.lock(); } + lock.readLock().lock(); try { boolean shouldCompact = region.flushcache(); // We just want to check the size @@ -407,7 +448,7 @@ if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { - server.compactSplitThread.requestCompaction(region, getName()); + server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName()); } server.getMetrics().addFlush(region.getRecentFlushInfo()); @@ -427,15 +468,18 @@ return false; } } finally { - try { - flushOccurred.signalAll(); - } finally { - lock.unlock(); - } + lock.readLock().unlock(); + wakeUpIfBlocking(); } return true; } + private void wakeUpIfBlocking() { + synchronized (blockSignal) { + blockSignal.notifyAll(); + } + } + private boolean isTooManyStoreFiles(HRegion region) { for (Store hstore: region.stores.values()) { if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { @@ -453,21 +497,21 @@ */ public synchronized void reclaimMemStoreMemory() { if (isAboveHighWaterMark()) { - lock.lock(); - try { + long start = System.currentTimeMillis(); + synchronized (this.blockSignal) { while (isAboveHighWaterMark() && !server.isStopped()) { wakeupFlushThread(); try { // we should be able to wait forever, but we've seen a bug where // we miss a notify, so put a 5 second bound on it at least. - flushOccurred.await(5, TimeUnit.SECONDS); + blockSignal.wait(5 * 1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } - } finally { - lock.unlock(); } + long took = System.currentTimeMillis() - start; + LOG.warn("Memstore is above high water mark and block " + took + "ms"); } else if (isAboveLowWaterMark()) { wakeupFlushThread(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1366266) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1526,8 +1526,7 @@ conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", handler); - Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher", - handler); + this.cacheFlusher.start(handler); Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + ".compactionChecker", handler); @@ -1759,7 +1758,7 @@ */ protected void join() { Threads.shutdown(this.compactionChecker.getThread()); - Threads.shutdown(this.cacheFlusher.getThread()); + this.cacheFlusher.join(); if (this.hlogRoller != null) { Threads.shutdown(this.hlogRoller.getThread()); }