Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1366266) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -44,8 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -225,7 +224,7 @@ // This lock prevents starting a log roll during a cache flush. // synchronized is insufficient because a cache flush spans two method calls. - private final Lock cacheFlushLock = new ReentrantLock(); + private final ReentrantReadWriteLock cacheFlushLock = new ReentrantReadWriteLock(); // We synchronize on updateLock to prevent updates and to prevent a log roll // during an update @@ -605,7 +604,7 @@ return null; } byte [][] regionsToFlush = null; - this.cacheFlushLock.lock(); + this.cacheFlushLock.writeLock().lock(); try { this.logRollRunning = true; if (closed) { @@ -676,7 +675,7 @@ try { this.logRollRunning = false; } finally { - this.cacheFlushLock.unlock(); + this.cacheFlushLock.writeLock().unlock(); } } return regionsToFlush; @@ -992,7 +991,7 @@ LOG.error("Exception while waiting for syncer thread to die", e); } - cacheFlushLock.lock(); + cacheFlushLock.writeLock().lock(); try { // Tell our listeners that the log is closing if (!this.listeners.isEmpty()) { @@ -1010,7 +1009,7 @@ } } } finally { - cacheFlushLock.unlock(); + cacheFlushLock.writeLock().unlock(); } } @@ -1540,7 +1539,7 @@ * @see #abortCacheFlush(byte[]) */ public long startCacheFlush(final byte[] encodedRegionName) { - this.cacheFlushLock.lock(); + this.cacheFlushLock.readLock().lock(); Long seq = this.lastSeqWritten.remove(encodedRegionName); // seq is the lsn of the oldest edit associated with this region. If a // snapshot already exists - because the last flush failed - then seq will @@ -1604,7 +1603,7 @@ // Cleaning up of lastSeqWritten is in the finally clause because we // don't want to confuse getOldestOutstandingSeqNum() this.lastSeqWritten.remove(getSnapshotName(encodedRegionName)); - this.cacheFlushLock.unlock(); + this.cacheFlushLock.readLock().unlock(); } } @@ -1641,7 +1640,7 @@ Runtime.getRuntime().halt(1); } } - this.cacheFlushLock.unlock(); + this.cacheFlushLock.readLock().unlock(); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1366266) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -32,8 +33,7 @@ import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; @@ -58,7 +59,7 @@ * @see FlushRequester */ @InterfaceAudience.Private -class MemStoreFlusher extends HasThread implements FlushRequester { +class MemStoreFlusher implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); // These two data members go together. Any entry in the one must have // a corresponding entry in the other. @@ -70,8 +71,8 @@ private final long threadWakeFrequency; private final HRegionServer server; - private final ReentrantLock lock = new ReentrantLock(); - private final Condition flushOccurred = lock.newCondition(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Object blockSignal = new Object(); protected final long globalMemStoreLimit; protected final long globalMemStoreLimitLowMark; @@ -85,6 +86,9 @@ private long blockingStoreFilesNumber; private long blockingWaitTime; + private FlushHandler[] flushHandlers = null; + private int handlerCount; + /** * @param conf * @param server @@ -109,6 +113,7 @@ conf.getInt("hbase.hstore.blockingStoreFiles", 7); this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); + this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1); LOG.info("globalMemStoreLimit=" + StringUtils.humanReadableInt(this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + @@ -207,64 +212,69 @@ return true; } - @Override - public void run() { - while (!this.server.isStopped()) { - FlushQueueEntry fqe = null; - try { - wakeupPending.set(false); // allow someone to wake us up again - fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (fqe == null || fqe instanceof WakeupFlushThread) { - if (isAboveLowWaterMark()) { - LOG.debug("Flush thread woke up because memory above low water=" + - StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark)); - if (!flushOneForGlobalPressure()) { - // Wasn't able to flush any region, but we're above low water mark - // This is unlikely to happen, but might happen when closing the - // entire server - another thread is flushing regions. We'll just - // sleep a little bit to avoid spinning, and then pretend that - // we flushed one, so anyone blocked will check again - lock.lock(); - try { + private class FlushHandler extends HasThread { + + private final int instanceNumber; + + FlushHandler(int instanceNumber) { + this.instanceNumber = instanceNumber; + this.setDaemon(true); + String threadName = "'MemStore Flush Handler " + this.instanceNumber; + this.setName(threadName); + } + + @Override + public void run() { + while (!server.isStopped()) { + FlushQueueEntry fqe = null; + try { + wakeupPending.set(false); // allow someone to wake us up again + fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + if (fqe == null || fqe instanceof WakeupFlushThread) { + if (isAboveLowWaterMark()) { + LOG.debug("Flush thread woke up because memory above low water=" + + StringUtils.humanReadableInt(globalMemStoreLimitLowMark)); + if (!flushOneForGlobalPressure()) { + // Wasn't able to flush any region, but we're above low water mark + // This is unlikely to happen, but might happen when closing the + // entire server - another thread is flushing regions. We'll just + // sleep a little bit to avoid spinning, and then pretend that + // we flushed one, so anyone blocked will check again Thread.sleep(1000); - flushOccurred.signalAll(); - } finally { - lock.unlock(); + wakeUpIfBlocking(); } + // Enqueue another one of these tokens so we'll wake up again + wakeupFlushThread(); } - // Enqueue another one of these tokens so we'll wake up again - wakeupFlushThread(); + continue; } + FlushRegionEntry fre = (FlushRegionEntry) fqe; + if (!flushRegion(fre)) { + break; + } + } catch (InterruptedException ex) { continue; + } catch (ConcurrentModificationException ex) { + continue; + } catch (Exception ex) { + LOG.error("Cache flusher failed for entry " + fqe, ex); + if (!server.checkFileSystem()) { + break; + } } - FlushRegionEntry fre = (FlushRegionEntry)fqe; - if (!flushRegion(fre)) { - break; - } - } catch (InterruptedException ex) { - continue; - } catch (ConcurrentModificationException ex) { - continue; - } catch (Exception ex) { - LOG.error("Cache flusher failed for entry " + fqe, ex); - if (!server.checkFileSystem()) { - break; - } } - } - this.regionsInQueue.clear(); - this.flushQueue.clear(); + synchronized (regionsInQueue) { + regionsInQueue.clear(); + flushQueue.clear(); + } - // Signal anyone waiting, so they see the close flag - lock.lock(); - try { - flushOccurred.signalAll(); - } finally { - lock.unlock(); + // Signal anyone waiting, so they see the close flag + wakeUpIfBlocking(); + LOG.info(getName() + " exiting"); } - LOG.info(getName() + " exiting"); } + private void wakeupFlushThread() { if (wakeupPending.compareAndSet(false, true)) { flushQueue.add(new WakeupFlushThread()); @@ -281,6 +291,10 @@ continue; } + if (region.writestate.flushing || !region.writestate.writesEnabled) { + continue; + } + if (checkStoreFileCount && isTooManyStoreFiles(region)) { continue; } @@ -326,14 +340,44 @@ * Only interrupt once it's done with a run through the work loop. */ void interruptIfNecessary() { - lock.lock(); + lock.writeLock().lock(); try { - this.interrupt(); + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null) flushHander.interrupt(); + } } finally { - lock.unlock(); + lock.writeLock().unlock(); } } + synchronized void start(UncaughtExceptionHandler eh) { + flushHandlers = new FlushHandler[handlerCount]; + for (int i = 0; i < flushHandlers.length; i++) { + flushHandlers[i] = new FlushHandler(i); + if (eh != null) { + flushHandlers[i].setUncaughtExceptionHandler(eh); + } + flushHandlers[i].start(); + } + } + + boolean isAlive() { + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null && flushHander.isAlive()) { + return true; + } + } + return false; + } + + void join() { + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null) { + Threads.shutdown(flushHander.getThread()); + } + } + } + /* * A flushRegion that checks store file count. If too many, puts the flush * on delay queue to retry later. @@ -359,7 +403,8 @@ "store files; delaying flush up to " + this.blockingWaitTime + "ms"); if (!this.server.compactSplitThread.requestSplit(region)) { try { - this.server.compactSplitThread.requestCompaction(region, getName()); + this.server.compactSplitThread.requestCompaction(region, Thread + .currentThread().getName()); } catch (IOException e) { LOG.error("Cache flush failed" + (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""), @@ -398,8 +443,8 @@ // emergencyFlush, then item was removed via a flushQueue.poll. flushQueue.remove(fqe); } - lock.lock(); } + lock.readLock().lock(); try { boolean shouldCompact = region.flushcache(); // We just want to check the size @@ -407,7 +452,7 @@ if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { - server.compactSplitThread.requestCompaction(region, getName()); + server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName()); } server.getMetrics().addFlush(region.getRecentFlushInfo()); @@ -427,15 +472,18 @@ return false; } } finally { - try { - flushOccurred.signalAll(); - } finally { - lock.unlock(); - } + lock.readLock().unlock(); + wakeUpIfBlocking(); } return true; } + private void wakeUpIfBlocking() { + synchronized (blockSignal) { + blockSignal.notifyAll(); + } + } + private boolean isTooManyStoreFiles(HRegion region) { for (Store hstore: region.stores.values()) { if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { @@ -453,21 +501,21 @@ */ public synchronized void reclaimMemStoreMemory() { if (isAboveHighWaterMark()) { - lock.lock(); - try { + long start = System.currentTimeMillis(); + synchronized (this.blockSignal) { while (isAboveHighWaterMark() && !server.isStopped()) { wakeupFlushThread(); try { // we should be able to wait forever, but we've seen a bug where // we miss a notify, so put a 5 second bound on it at least. - flushOccurred.await(5, TimeUnit.SECONDS); + blockSignal.wait(5 * 1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } - } finally { - lock.unlock(); } + long took = System.currentTimeMillis() - start; + LOG.warn("Memstore is above high water mark and block " + took + "ms"); } else if (isAboveLowWaterMark()) { wakeupFlushThread(); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1366266) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1526,8 +1526,7 @@ conf.getInt("hbase.regionserver.executor.closemeta.threads", 1)); Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", handler); - Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher", - handler); + this.cacheFlusher.start(handler); Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + ".compactionChecker", handler); @@ -1759,7 +1758,7 @@ */ protected void join() { Threads.shutdown(this.compactionChecker.getThread()); - Threads.shutdown(this.cacheFlusher.getThread()); + this.cacheFlusher.join(); if (this.hlogRoller != null) { Threads.shutdown(this.hlogRoller.getThread()); }