### Eclipse Workspace Patch 1.0 #P apache-trunk Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1437306) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -29,10 +30,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; @@ -59,7 +61,7 @@ * @see FlushRequester */ @InterfaceAudience.Private -class MemStoreFlusher extends HasThread implements FlushRequester { +class MemStoreFlusher implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); // These two data members go together. Any entry in the one must have // a corresponding entry in the other. @@ -71,8 +73,8 @@ private final long threadWakeFrequency; private final HRegionServer server; - private final ReentrantLock lock = new ReentrantLock(); - private final Condition flushOccurred = lock.newCondition(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Object blockSignal = new Object(); protected final long globalMemStoreLimit; protected final long globalMemStoreLimitLowMark; @@ -87,6 +89,9 @@ private long blockingWaitTime; private final Counter updatesBlockedMsHighWater = new Counter(); + private FlushHandler[] flushHandlers = null; + private int handlerCount; + /** * @param conf * @param server @@ -111,6 +116,7 @@ conf.getInt("hbase.hstore.blockingStoreFiles", 7); this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); + this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1); LOG.info("globalMemStoreLimit=" + StringUtils.humanReadableInt(this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + @@ -213,64 +219,59 @@ return true; } - @Override - public void run() { - while (!this.server.isStopped()) { - FlushQueueEntry fqe = null; - try { - wakeupPending.set(false); // allow someone to wake us up again - fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (fqe == null || fqe instanceof WakeupFlushThread) { - if (isAboveLowWaterMark()) { - LOG.debug("Flush thread woke up because memory above low water=" + - StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark)); - if (!flushOneForGlobalPressure()) { - // Wasn't able to flush any region, but we're above low water mark - // This is unlikely to happen, but might happen when closing the - // entire server - another thread is flushing regions. We'll just - // sleep a little bit to avoid spinning, and then pretend that - // we flushed one, so anyone blocked will check again - lock.lock(); - try { + private class FlushHandler extends HasThread { + @Override + public void run() { + while (!server.isStopped()) { + FlushQueueEntry fqe = null; + try { + wakeupPending.set(false); // allow someone to wake us up again + fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + if (fqe == null || fqe instanceof WakeupFlushThread) { + if (isAboveLowWaterMark()) { + LOG.debug("Flush thread woke up because memory above low water=" + + StringUtils.humanReadableInt(globalMemStoreLimitLowMark)); + if (!flushOneForGlobalPressure()) { + // Wasn't able to flush any region, but we're above low water mark + // This is unlikely to happen, but might happen when closing the + // entire server - another thread is flushing regions. We'll just + // sleep a little bit to avoid spinning, and then pretend that + // we flushed one, so anyone blocked will check again Thread.sleep(1000); - flushOccurred.signalAll(); - } finally { - lock.unlock(); + wakeUpIfBlocking(); } + // Enqueue another one of these tokens so we'll wake up again + wakeupFlushThread(); } - // Enqueue another one of these tokens so we'll wake up again - wakeupFlushThread(); + continue; } + FlushRegionEntry fre = (FlushRegionEntry) fqe; + if (!flushRegion(fre)) { + break; + } + } catch (InterruptedException ex) { continue; + } catch (ConcurrentModificationException ex) { + continue; + } catch (Exception ex) { + LOG.error("Cache flusher failed for entry " + fqe, ex); + if (!server.checkFileSystem()) { + break; + } } - FlushRegionEntry fre = (FlushRegionEntry)fqe; - if (!flushRegion(fre)) { - break; - } - } catch (InterruptedException ex) { - continue; - } catch (ConcurrentModificationException ex) { - continue; - } catch (Exception ex) { - LOG.error("Cache flusher failed for entry " + fqe, ex); - if (!server.checkFileSystem()) { - break; - } } - } - this.regionsInQueue.clear(); - this.flushQueue.clear(); + synchronized (regionsInQueue) { + regionsInQueue.clear(); + flushQueue.clear(); + } - // Signal anyone waiting, so they see the close flag - lock.lock(); - try { - flushOccurred.signalAll(); - } finally { - lock.unlock(); + // Signal anyone waiting, so they see the close flag + wakeUpIfBlocking(); + LOG.info(getName() + " exiting"); } - LOG.info(getName() + " exiting"); } + private void wakeupFlushThread() { if (wakeupPending.compareAndSet(false, true)) { flushQueue.add(new WakeupFlushThread()); @@ -287,6 +288,10 @@ continue; } + if (region.writestate.flushing || !region.writestate.writesEnabled) { + continue; + } + if (checkStoreFileCount && isTooManyStoreFiles(region)) { continue; } @@ -332,14 +337,44 @@ * Only interrupt once it's done with a run through the work loop. */ void interruptIfNecessary() { - lock.lock(); + lock.writeLock().lock(); try { - this.interrupt(); + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null) flushHander.interrupt(); + } } finally { - lock.unlock(); + lock.writeLock().unlock(); } } + synchronized void start(UncaughtExceptionHandler eh) { + ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory( + server.getServerName().toString() + "-MemStoreFlusher", eh); + flushHandlers = new FlushHandler[handlerCount]; + for (int i = 0; i < flushHandlers.length; i++) { + flushHandlers[i] = new FlushHandler(); + flusherThreadFactory.newThread(flushHandlers[i]); + flushHandlers[i].start(); + } + } + + boolean isAlive() { + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null && flushHander.isAlive()) { + return true; + } + } + return false; + } + + void join() { + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null) { + Threads.shutdown(flushHander.getThread()); + } + } + } + /* * A flushRegion that checks store file count. If too many, puts the flush * on delay queue to retry later. @@ -365,7 +400,8 @@ "store files; delaying flush up to " + this.blockingWaitTime + "ms"); if (!this.server.compactSplitThread.requestSplit(region)) { try { - this.server.compactSplitThread.requestCompaction(region, getName()); + this.server.compactSplitThread.requestCompaction(region, Thread + .currentThread().getName()); } catch (IOException e) { LOG.error( "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()), @@ -404,8 +440,8 @@ // emergencyFlush, then item was removed via a flushQueue.poll. flushQueue.remove(fqe); } - lock.lock(); } + lock.readLock().lock(); try { boolean shouldCompact = region.flushcache(); // We just want to check the size @@ -413,7 +449,7 @@ if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { - server.compactSplitThread.requestCompaction(region, getName()); + server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName()); } } catch (DroppedSnapshotException ex) { @@ -432,15 +468,18 @@ return false; } } finally { - try { - flushOccurred.signalAll(); - } finally { - lock.unlock(); - } + lock.readLock().unlock(); + wakeUpIfBlocking(); } return true; } + private void wakeUpIfBlocking() { + synchronized (blockSignal) { + blockSignal.notifyAll(); + } + } + private boolean isTooManyStoreFiles(HRegion region) { for (Store hstore : region.stores.values()) { if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { @@ -458,12 +497,12 @@ */ public void reclaimMemStoreMemory() { if (isAboveHighWaterMark()) { - lock.lock(); - try { + long start = System.currentTimeMillis(); + synchronized (this.blockSignal) { boolean blocked = false; long startTime = 0; while (isAboveHighWaterMark() && !server.isStopped()) { - if(!blocked){ + if (!blocked) { startTime = EnvironmentEdgeManager.currentTimeMillis(); LOG.info("Blocking updates on " + server.toString() + ": the global memstore size " + @@ -476,10 +515,12 @@ try { // we should be able to wait forever, but we've seen a bug where // we miss a notify, so put a 5 second bound on it at least. - flushOccurred.await(5, TimeUnit.SECONDS); + blockSignal.wait(5 * 1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } + long took = System.currentTimeMillis() - start; + LOG.warn("Memstore is above high water mark and block " + took + "ms"); } if(blocked){ final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; @@ -488,8 +529,6 @@ } LOG.info("Unblocking updates for server " + server.toString()); } - } finally { - lock.unlock(); } } else if (isAboveLowWaterMark()) { wakeupFlushThread(); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (revision 1393126) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (working copy) @@ -247,12 +247,16 @@ put.add(HConstants.CATALOG_FAMILY, null, value); table.put(put); } + Put tmpPut = new Put(Bytes.toBytes("tmprow")); + tmpPut.add(HConstants.CATALOG_FAMILY, null, value); long startTime = System.currentTimeMillis(); long remaining = timeout; while (remaining > 0) { if (log.isLowReplicationRollEnabled() == expect) { break; } else { + // Trigger calling FSHlog#checkLowReplication() + table.put(tmpPut); try { Thread.sleep(200); } catch (InterruptedException e) { @@ -371,7 +375,8 @@ assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); batchWriteAndWait(table, 3, false, 10000); - assertTrue("LowReplication Roller should've been disabled", + assertTrue("LowReplication Roller should've been disabled, current replication=" + + ((FSHLog) log).getLogReplication(), !log.isLowReplicationRollEnabled()); dfsCluster Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (revision 1437306) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (working copy) @@ -208,16 +208,30 @@ } /** - * Get a named {@link ThreadFactory} that just builds daemon threads + * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)}, + * without setting the exception handler. + */ + public static ThreadFactory newDaemonThreadFactory(final String prefix) { + return newDaemonThreadFactory(prefix, null); + } + + /** + * Get a named {@link ThreadFactory} that just builds daemon threads. * @param prefix name prefix for all threads created from the factory - * @return a thread factory that creates named, daemon threads + * @param handler unhandles exception handler to set for all threads + * @return a thread factory that creates named, daemon threads with + * the supplied exception handler and normal priority */ - public static ThreadFactory newDaemonThreadFactory(final String prefix) { + public static ThreadFactory newDaemonThreadFactory(final String prefix, + final UncaughtExceptionHandler handler) { final ThreadFactory namedFactory = getNamedThreadFactory(prefix); return new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = namedFactory.newThread(r); + if (handler != null) { + t.setUncaughtExceptionHandler(handler); + } if (!t.isDaemon()) { t.setDaemon(true); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1437306) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1531,8 +1531,7 @@ Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", uncaughtExceptionHandler); - Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher", - uncaughtExceptionHandler); + this.cacheFlusher.start(uncaughtExceptionHandler); Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + ".compactionChecker", uncaughtExceptionHandler); if (this.healthCheckChore != null) { @@ -1790,7 +1789,7 @@ */ protected void join() { Threads.shutdown(this.compactionChecker.getThread()); - Threads.shutdown(this.cacheFlusher.getThread()); + this.cacheFlusher.join(); if (this.healthCheckChore != null) { Threads.shutdown(this.healthCheckChore.getThread()); }