Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (revision 1437229) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (working copy) @@ -42,6 +42,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -108,7 +111,7 @@ @InterfaceAudience.Private class FSHLog implements HLog, Syncable { static final Log LOG = LogFactory.getLog(FSHLog.class); - + private final FileSystem fs; private final Path rootDir; private final Path dir; @@ -129,7 +132,7 @@ private WALCoprocessorHost coprocessorHost; private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer - // Minimum tolerable replicas, if the actual value is lower than it, + // Minimum tolerable replicas, if the actual value is lower than it, // rollWriter will be triggered private int minTolerableReplication; private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas @@ -241,10 +244,10 @@ public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf) throws IOException { - this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, + this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, false); } - + /** * Constructor. * @@ -311,7 +314,7 @@ * @throws IOException */ public FSHLog(final FileSystem fs, final Path root, final String logDir, - final String oldLogDir, final Configuration conf, + final String oldLogDir, final Configuration conf, final List listeners, final boolean failIfLogDirExists, final String prefix, boolean forMeta) throws IOException { @@ -322,15 +325,15 @@ this.oldLogDir = new Path(this.rootDir, oldLogDir); this.forMeta = forMeta; this.conf = conf; - + if (listeners != null) { for (WALActionsListener i: listeners) { registerWALActionsListener(i); } } - + this.failIfLogDirExists = failIfLogDirExists; - + this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", getDefaultBlockSize()); // Roll at 95% of block size. @@ -338,7 +341,7 @@ this.logrollsize = (long)(this.blocksize * multi); this.optionalFlushInterval = conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000); - + this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication", @@ -348,9 +351,9 @@ this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true); this.closeErrorsTolerated = conf.getInt( "hbase.regionserver.logroll.errors.tolerated", 0); - + this.logSyncerThread = new LogSyncer(this.optionalFlushInterval); - + LOG.info("HLog configuration: blocksize=" + StringUtils.byteDesc(this.blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + @@ -375,7 +378,7 @@ } // rollWriter sets this.hdfs_out if it can. rollWriter(); - + // handle the reflection necessary to call getNumCurrentReplicas() this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); @@ -385,7 +388,7 @@ this.metrics = new MetricsWAL(); } - + // use reflection to search for getDefaultBlockSize(Path f) // if the method doesn't exist, fall back to using getDefaultBlockSize() private long getDefaultBlockSize() throws IOException { @@ -478,7 +481,7 @@ * @return The wrapped stream our writer is using; its not the * writer's 'out' FSDatoOutputStream but the stream that this 'out' wraps * (In hdfs its an instance of DFSDataOutputStream). - * + * * usage: see TestLogRolling.java */ OutputStream getOutputStream() { @@ -569,7 +572,7 @@ /** * This method allows subclasses to inject different writers without having to * extend other methods like rollWriter(). - * + * * @param fs * @param path * @param conf @@ -942,7 +945,7 @@ } // Sync if catalog region, and if not then check if that table supports // deferred log flushing - if (doSync && + if (doSync && (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system @@ -952,14 +955,14 @@ } @Override - public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, + public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { return append(info, tableName, edits, clusterId, now, htd, false); } @Override - public long append(HRegionInfo info, byte [] tableName, WALEdit edits, + public long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { return append(info, tableName, edits, clusterId, now, htd, true); @@ -981,8 +984,8 @@ // List of pending writes to the HLog. There corresponds to transactions // that have not yet returned to the client. We keep them cached here - // instead of writing them to HDFS piecemeal, because the HDFS write - // method is pretty heavyweight as far as locking is concerned. The + // instead of writing them to HDFS piecemeal, because the HDFS write + // method is pretty heavyweight as far as locking is concerned. The // goal is to increase the batchsize for writing-to-hdfs as well as // sync-to-hdfs, so that we can get better system throughput. private List pendingWrites = new LinkedList(); @@ -1069,7 +1072,7 @@ // See HBASE-4387, HBASE-5623, HBASE-7329. tempWriter = this.writer; } - // if the transaction that we are interested in is already + // if the transaction that we are interested in is already // synced, then return immediately. if (txid <= this.syncedTillHere) { return; @@ -1077,7 +1080,7 @@ try { long doneUpto; long now = EnvironmentEdgeManager.currentTimeMillis(); - // First flush all the pending writes to HDFS. Then + // First flush all the pending writes to HDFS. Then // issue the sync to HDFS. If sync is successful, then update // syncedTillHere to indicate that transactions till this // number has been successfully synced. @@ -1344,13 +1347,13 @@ /** * Get the directory we are making logs in. - * + * * @return dir */ protected Path getDir() { return dir; } - + static Path getHLogArchivePath(Path oldLogDir, Path p) { return new Path(oldLogDir, p.getName()); } @@ -1388,7 +1391,7 @@ conf, baseDir, p, oldLogDir, fs); logSplitter.splitLog(); } - + @Override public WALCoprocessorHost getCoprocessorHost() { return coprocessorHost; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1437229) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -29,10 +30,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; @@ -59,7 +61,7 @@ * @see FlushRequester */ @InterfaceAudience.Private -class MemStoreFlusher extends HasThread implements FlushRequester { +class MemStoreFlusher implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); // These two data members go together. Any entry in the one must have // a corresponding entry in the other. @@ -71,8 +73,8 @@ private final long threadWakeFrequency; private final HRegionServer server; - private final ReentrantLock lock = new ReentrantLock(); - private final Condition flushOccurred = lock.newCondition(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Object blockSignal = new Object(); protected final long globalMemStoreLimit; protected final long globalMemStoreLimitLowMark; @@ -87,6 +89,9 @@ private long blockingWaitTime; private final Counter updatesBlockedMsHighWater = new Counter(); + private FlushHandler[] flushHandlers = null; + private int handlerCount; + /** * @param conf * @param server @@ -111,6 +116,7 @@ conf.getInt("hbase.hstore.blockingStoreFiles", 7); this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); + this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1); LOG.info("globalMemStoreLimit=" + StringUtils.humanReadableInt(this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + @@ -213,64 +219,59 @@ return true; } - @Override - public void run() { - while (!this.server.isStopped()) { - FlushQueueEntry fqe = null; - try { - wakeupPending.set(false); // allow someone to wake us up again - fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (fqe == null || fqe instanceof WakeupFlushThread) { - if (isAboveLowWaterMark()) { - LOG.debug("Flush thread woke up because memory above low water=" + - StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark)); - if (!flushOneForGlobalPressure()) { - // Wasn't able to flush any region, but we're above low water mark - // This is unlikely to happen, but might happen when closing the - // entire server - another thread is flushing regions. We'll just - // sleep a little bit to avoid spinning, and then pretend that - // we flushed one, so anyone blocked will check again - lock.lock(); - try { + private class FlushHandler extends HasThread { + @Override + public void run() { + while (!server.isStopped()) { + FlushQueueEntry fqe = null; + try { + wakeupPending.set(false); // allow someone to wake us up again + fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + if (fqe == null || fqe instanceof WakeupFlushThread) { + if (isAboveLowWaterMark()) { + LOG.debug("Flush thread woke up because memory above low water=" + + StringUtils.humanReadableInt(globalMemStoreLimitLowMark)); + if (!flushOneForGlobalPressure()) { + // Wasn't able to flush any region, but we're above low water mark + // This is unlikely to happen, but might happen when closing the + // entire server - another thread is flushing regions. We'll just + // sleep a little bit to avoid spinning, and then pretend that + // we flushed one, so anyone blocked will check again Thread.sleep(1000); - flushOccurred.signalAll(); - } finally { - lock.unlock(); + wakeUpIfBlocking(); } + // Enqueue another one of these tokens so we'll wake up again + wakeupFlushThread(); } - // Enqueue another one of these tokens so we'll wake up again - wakeupFlushThread(); + continue; } + FlushRegionEntry fre = (FlushRegionEntry) fqe; + if (!flushRegion(fre)) { + break; + } + } catch (InterruptedException ex) { continue; + } catch (ConcurrentModificationException ex) { + continue; + } catch (Exception ex) { + LOG.error("Cache flusher failed for entry " + fqe, ex); + if (!server.checkFileSystem()) { + break; + } } - FlushRegionEntry fre = (FlushRegionEntry)fqe; - if (!flushRegion(fre)) { - break; - } - } catch (InterruptedException ex) { - continue; - } catch (ConcurrentModificationException ex) { - continue; - } catch (Exception ex) { - LOG.error("Cache flusher failed for entry " + fqe, ex); - if (!server.checkFileSystem()) { - break; - } } - } - this.regionsInQueue.clear(); - this.flushQueue.clear(); + synchronized (regionsInQueue) { + regionsInQueue.clear(); + flushQueue.clear(); + } - // Signal anyone waiting, so they see the close flag - lock.lock(); - try { - flushOccurred.signalAll(); - } finally { - lock.unlock(); + // Signal anyone waiting, so they see the close flag + wakeUpIfBlocking(); + LOG.info(getName() + " exiting"); } - LOG.info(getName() + " exiting"); } + private void wakeupFlushThread() { if (wakeupPending.compareAndSet(false, true)) { flushQueue.add(new WakeupFlushThread()); @@ -287,6 +288,10 @@ continue; } + if (region.writestate.flushing || !region.writestate.writesEnabled) { + continue; + } + if (checkStoreFileCount && isTooManyStoreFiles(region)) { continue; } @@ -332,14 +337,44 @@ * Only interrupt once it's done with a run through the work loop. */ void interruptIfNecessary() { - lock.lock(); + lock.writeLock().lock(); try { - this.interrupt(); + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null) flushHander.interrupt(); + } } finally { - lock.unlock(); + lock.writeLock().unlock(); } } + synchronized void start(UncaughtExceptionHandler eh) { + ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory( + server.getServerName().toString() + "-MemStoreFlusher", eh); + flushHandlers = new FlushHandler[handlerCount]; + for (int i = 0; i < flushHandlers.length; i++) { + flushHandlers[i] = new FlushHandler(); + flusherThreadFactory.newThread(flushHandlers[i]); + flushHandlers[i].start(); + } + } + + boolean isAlive() { + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null && flushHander.isAlive()) { + return true; + } + } + return false; + } + + void join() { + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null) { + Threads.shutdown(flushHander.getThread()); + } + } + } + /* * A flushRegion that checks store file count. If too many, puts the flush * on delay queue to retry later. @@ -365,7 +400,8 @@ "store files; delaying flush up to " + this.blockingWaitTime + "ms"); if (!this.server.compactSplitThread.requestSplit(region)) { try { - this.server.compactSplitThread.requestCompaction(region, getName()); + this.server.compactSplitThread.requestCompaction(region, Thread + .currentThread().getName()); } catch (IOException e) { LOG.error( "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()), @@ -404,8 +440,8 @@ // emergencyFlush, then item was removed via a flushQueue.poll. flushQueue.remove(fqe); } - lock.lock(); } + lock.readLock().lock(); try { boolean shouldCompact = region.flushcache(); // We just want to check the size @@ -413,7 +449,7 @@ if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { - server.compactSplitThread.requestCompaction(region, getName()); + server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName()); } } catch (DroppedSnapshotException ex) { @@ -432,15 +468,18 @@ return false; } } finally { - try { - flushOccurred.signalAll(); - } finally { - lock.unlock(); - } + lock.readLock().unlock(); + wakeUpIfBlocking(); } return true; } + private void wakeUpIfBlocking() { + synchronized (blockSignal) { + blockSignal.notifyAll(); + } + } + private boolean isTooManyStoreFiles(HRegion region) { for (Store hstore : region.stores.values()) { if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { @@ -458,12 +497,12 @@ */ public void reclaimMemStoreMemory() { if (isAboveHighWaterMark()) { - lock.lock(); - try { + long start = System.currentTimeMillis(); + synchronized (this.blockSignal) { boolean blocked = false; long startTime = 0; while (isAboveHighWaterMark() && !server.isStopped()) { - if(!blocked){ + if (!blocked) { startTime = EnvironmentEdgeManager.currentTimeMillis(); LOG.info("Blocking updates on " + server.toString() + ": the global memstore size " + @@ -476,10 +515,12 @@ try { // we should be able to wait forever, but we've seen a bug where // we miss a notify, so put a 5 second bound on it at least. - flushOccurred.await(5, TimeUnit.SECONDS); + blockSignal.wait(5 * 1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } + long took = System.currentTimeMillis() - start; + LOG.warn("Memstore is above high water mark and block " + took + "ms"); } if(blocked){ final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; @@ -488,8 +529,6 @@ } LOG.info("Unblocking updates for server " + server.toString()); } - } finally { - lock.unlock(); } } else if (isAboveLowWaterMark()) { wakeupFlushThread(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1437229) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -1531,8 +1531,7 @@ Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", uncaughtExceptionHandler); - Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher", - uncaughtExceptionHandler); + this.cacheFlusher.start(uncaughtExceptionHandler); Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + ".compactionChecker", uncaughtExceptionHandler); if (this.healthCheckChore != null) { @@ -1790,7 +1789,7 @@ */ protected void join() { Threads.shutdown(this.compactionChecker.getThread()); - Threads.shutdown(this.cacheFlusher.getThread()); + this.cacheFlusher.join(); if (this.healthCheckChore != null) { Threads.shutdown(this.healthCheckChore.getThread()); } Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (revision 1437229) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (working copy) @@ -208,16 +208,30 @@ } /** - * Get a named {@link ThreadFactory} that just builds daemon threads + * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)}, + * without setting the exception handler. + */ + public static ThreadFactory newDaemonThreadFactory(final String prefix) { + return newDaemonThreadFactory(prefix, null); + } + + /** + * Get a named {@link ThreadFactory} that just builds daemon threads. * @param prefix name prefix for all threads created from the factory - * @return a thread factory that creates named, daemon threads + * @param handler unhandles exception handler to set for all threads + * @return a thread factory that creates named, daemon threads with + * the supplied exception handler and normal priority */ - public static ThreadFactory newDaemonThreadFactory(final String prefix) { + public static ThreadFactory newDaemonThreadFactory(final String prefix, + final UncaughtExceptionHandler handler) { final ThreadFactory namedFactory = getNamedThreadFactory(prefix); return new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = namedFactory.newThread(r); + if (handler != null) { + t.setUncaughtExceptionHandler(handler); + } if (!t.isDaemon()) { t.setDaemon(true); }