Index: src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (revision 649232) +++ src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java (working copy) @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,21 +31,20 @@ /** Runs periodically to determine if the HLog should be rolled */ class LogRoller extends Thread implements LogRollListener { static final Log LOG = LogFactory.getLog(LogRoller.class); - private final Integer rollLock = new Integer(0); + private final ReentrantLock rollLock = new ReentrantLock(); private final long optionalLogRollInterval; private long lastLogRollTime; - private volatile boolean rollLog; + private final AtomicBoolean rollLog = new AtomicBoolean(false); private final HRegionServer server; private final HBaseConfiguration conf; - /** constructor */ + /** @param server */ public LogRoller(final HRegionServer server) { super(); this.server = server; conf = server.conf; this.optionalLogRollInterval = conf.getLong( "hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L); - this.rollLog = false; lastLogRollTime = System.currentTimeMillis(); } @@ -51,51 +52,62 @@ @Override public void run() { while (!server.isStopRequested()) { - while (!rollLog && !server.isStopRequested()) { + while (!rollLog.get() && !server.isStopRequested()) { long now = System.currentTimeMillis(); if (this.lastLogRollTime + this.optionalLogRollInterval <= now) { - rollLog = true; + rollLog.set(true); this.lastLogRollTime = now; } else { - synchronized (rollLock) { + synchronized (rollLog) { try { - rollLock.wait(server.threadWakeFrequency); + rollLog.wait(server.threadWakeFrequency); } catch (InterruptedException e) { continue; } } } } - if (!rollLog) { + if (!rollLog.get()) { // There's only two reasons to break out of the while loop. // 1. Log roll requested // 2. Stop requested // so if a log roll was not requested, continue and break out of loop continue; } - synchronized (server.logRollerLock) { - try { - LOG.info("Rolling hlog. Number of entries: " + server.getLog().getNumEntries()); - server.getLog().rollWriter(); - } catch (IOException ex) { - LOG.error("Log rolling failed", + rollLock.lock(); // Don't interrupt us. We're working + try { + LOG.info("Rolling hlog. Number of entries: " + server.getLog().getNumEntries()); + server.getLog().rollWriter(); + } catch (IOException ex) { + LOG.error("Log rolling failed", RemoteExceptionHandler.checkIOException(ex)); - server.checkFileSystem(); - } catch (Exception ex) { - LOG.error("Log rolling failed", ex); - server.checkFileSystem(); - } finally { - rollLog = false; - } + server.checkFileSystem(); + } catch (Exception ex) { + LOG.error("Log rolling failed", ex); + server.checkFileSystem(); + } finally { + rollLog.set(false); + rollLock.unlock(); } } + LOG.info("LogRoller exiting."); } /** {@inheritDoc} */ public void logRollRequested() { - synchronized (rollLock) { - rollLog = true; - rollLock.notifyAll(); + synchronized (rollLog) { + rollLog.set(true); + rollLog.notifyAll(); } } + + /** + * Called by region server to wake up this thread if it sleeping. + * It is sleeping if rollLock is not held. + */ + public void interruptIfNecessary() { + if (rollLock.tryLock()) { + this.interrupt(); + } + } } \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/regionserver/Flusher.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/Flusher.java (revision 649232) +++ src/java/org/apache/hadoop/hbase/regionserver/Flusher.java (working copy) @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.TimeUnit; import java.util.HashSet; import java.util.Set; @@ -48,7 +49,7 @@ private final long threadWakeFrequency; private final long optionalFlushPeriod; private final HRegionServer server; - private final Integer lock = new Integer(0); + private final ReentrantLock lock = new ReentrantLock(); private final Integer memcacheSizeLock = new Integer(0); private long lastOptionalCheck = System.currentTimeMillis(); @@ -84,7 +85,10 @@ try { enqueueOptionalFlushRegions(); r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (!flushImmediately(r)) { + if (r == null) { + continue; + } + if (!flushRegion(r, false)) { break; } } catch (InterruptedException ex) { @@ -118,9 +122,9 @@ /** * Only interrupt once it's done with a run through the work loop. */ - void interruptPolitely() { - synchronized (lock) { - interrupt(); + void interruptIfNecessary() { + if (lock.tryLock()) { + this.interrupt(); } } @@ -128,40 +132,41 @@ * Flush a region right away, while respecting concurrency with the async * flushing that is always going on. */ - private boolean flushImmediately(HRegion region) { - try { - if (region != null) { - synchronized (regionsInQueue) { - // take the region out of the set and the queue, if it happens to be - // in the queue. this didn't used to be a constraint, but now that - // HBASE-512 is in play, we need to try and limit double-flushing - // regions. - regionsInQueue.remove(region); - flushQueue.remove(region); + private boolean flushRegion(HRegion region, boolean removeFromQueue) { + synchronized (regionsInQueue) { + // take the region out of the set. If removeFromQueue is true, remove it + // from the queue too if it is there. This didn't used to be a constraint, + // but now that HBASE-512 is in play, we need to try and limit + // double-flushing of regions. + if (regionsInQueue.remove(region) && removeFromQueue) { + flushQueue.remove(region); + } + lock.lock(); + try { + if (region.flushcache()) { + server.compactSplitThread.compactionRequested(region); } - synchronized (lock) { // Don't interrupt while we're working - if (region.flushcache()) { - server.compactSplitThread.compactionRequested(region); - } + } catch (DroppedSnapshotException ex) { + // Cache flush can fail in a few places. If it fails in a critical + // section, we get a DroppedSnapshotException and a replay of hlog + // is required. Currently the only way to do this is a restart of + // the server. + LOG.fatal("Replay of hlog required. Forcing server restart", ex); + if (!server.checkFileSystem()) { + return false; } - } - } catch (DroppedSnapshotException ex) { - // Cache flush can fail in a few places. If it fails in a critical - // section, we get a DroppedSnapshotException and a replay of hlog - // is required. Currently the only way to do this is a restart of - // the server. - LOG.fatal("Replay of hlog required. Forcing server restart", ex); - if (!server.checkFileSystem()) { + server.stop(); return false; + } catch (IOException ex) { + LOG.error("Cache flush failed" + + (region != null ? (" for region " + region.getRegionName()) : ""), + RemoteExceptionHandler.checkIOException(ex)); + if (!server.checkFileSystem()) { + return false; + } + } finally { + lock.unlock(); } - server.stop(); - } catch (IOException ex) { - LOG.error("Cache flush failed" + - (region != null ? (" for region " + region.getRegionName()) : ""), - RemoteExceptionHandler.checkIOException(ex)); - if (!server.checkFileSystem()) { - return false; - } } return true; } @@ -223,7 +228,10 @@ // flush the region with the biggest memcache HRegion biggestMemcacheRegion = sortedRegions.remove(sortedRegions.firstKey()); - flushImmediately(biggestMemcacheRegion); + if (!flushRegion(biggestMemcacheRegion, true)) { + // Something bad happened - give up. + break; + } } } Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 649232) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -22,7 +22,6 @@ import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.reflect.Constructor; -import java.lang.reflect.Member; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Arrays; @@ -47,7 +46,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.dfs.AlreadyBeingCreatedException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -190,7 +188,6 @@ // eclipse warning when accessed by inner classes protected HLog log; final LogRoller logRoller; - final Integer logRollerLock = new Integer(0); // flag set after we're done setting up server threads (used for testing) protected volatile boolean isOnline; @@ -323,23 +320,21 @@ // this message. if (checkFileSystem()) { closeAllRegions(); - synchronized (logRollerLock) { - try { - log.closeAndDelete(); - } catch (Exception e) { - LOG.error("error closing and deleting HLog", e); - } - try { - serverInfo.setStartCode(System.currentTimeMillis()); - log = setupHLog(); - } catch (IOException e) { - this.abortRequested = true; - this.stopRequested.set(true); - e = RemoteExceptionHandler.checkIOException(e); - LOG.fatal("error restarting server", e); - break; - } + try { + log.closeAndDelete(); + } catch (Exception e) { + LOG.error("error closing and deleting HLog", e); } + try { + serverInfo.setStartCode(System.currentTimeMillis()); + log = setupHLog(); + } catch (IOException e) { + this.abortRequested = true; + this.stopRequested.set(true); + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal("error restarting server", e); + break; + } reportForDuty(sleeper); restart = true; } else { @@ -422,11 +417,9 @@ // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already - cacheFlusher.interruptPolitely(); - compactSplitThread.interruptPolitely(); - synchronized (logRollerLock) { - this.logRoller.interrupt(); - } + cacheFlusher.interruptIfNecessary(); + compactSplitThread.interruptIfNecessary(); + this.logRoller.interruptIfNecessary(); if (abortRequested) { if (this.fsOk) { @@ -470,7 +463,6 @@ LOG.info("stopping server at: " + serverInfo.getServerAddress().toString()); } - join(); LOG.info(Thread.currentThread().getName() + " exiting"); } Index: src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 649232) +++ src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy) @@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.StringUtils; @@ -48,7 +49,7 @@ private HTable meta = null; private volatile long startTime; private final long frequency; - private final Integer lock = new Integer(0); + private final ReentrantLock lock = new ReentrantLock(); private final HRegionServer server; private final HBaseConfiguration conf; @@ -79,12 +80,15 @@ synchronized (regionsInQueue) { regionsInQueue.remove(r); } - synchronized (lock) { + lock.lock(); + try { // Don't interrupt us while we are working Text midKey = r.compactStores(); if (midKey != null) { split(r, midKey); } + } finally { + lock.unlock(); } } } catch (InterruptedException ex) { @@ -218,9 +222,9 @@ /** * Only interrupt once it's done with a run through the work loop. */ - void interruptPolitely() { - synchronized (lock) { - interrupt(); + void interruptIfNecessary() { + if (lock.tryLock()) { + this.interrupt(); } } }