Index: conf/hbase-default.xml =================================================================== --- conf/hbase-default.xml (revision 729452) +++ conf/hbase-default.xml (working copy) @@ -225,6 +225,12 @@ + hbase.regionserver.safemode.period + 120000 + Time to wait on regionserver startup before beginning + compactions and memcache flushes. + + hbase.hregion.memcache.flush.size 67108864 Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 729452) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -117,7 +117,9 @@ protected final AtomicBoolean stopRequested = new AtomicBoolean(false); protected final AtomicBoolean quiesced = new AtomicBoolean(false); - + + protected final AtomicBoolean safeMode = new AtomicBoolean(true); + // Go down hard. Used if file system becomes unavailable and also in // debugging and unit tests. protected volatile boolean abortRequested; @@ -197,6 +199,9 @@ final LogRoller logRoller; final LogFlusher logFlusher; + // safemode processing + SafeModeThread safeModeThread; + // flag set after we're done setting up server threads (used for testing) protected volatile boolean isOnline; @@ -717,6 +722,63 @@ return this.fsOk; } + /** + * Thread for toggling safemode after some configurable interval. + */ + private class SafeModeThread extends Thread { + + public void run() { + // first, wait the required interval before turning off safemode + int safemodeInterval = + conf.getInt("hbase.regionserver.safemode.period", 120 * 1000); + try { + Thread.sleep(safemodeInterval); + } catch (InterruptedException ex) { + // turn off safemode and limits on the way out due to some kind of + // abnormal condition so we do not prevent such things as memcache + // flushes and worsen the situation + safeMode.set(false); + compactSplitThread.setLimit(-1); + if (LOG.isDebugEnabled()) { + LOG.debug(this.getName() + " exiting on interrupt"); + } + return; + } + LOG.info("leaving safe mode"); + safeMode.set(false); + + // now that safemode is off, slowly increase the per-cycle compaction + // limit, finally setting it to unlimited (-1) + int compactionCheckInterval = + conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency", + 20 * 1000); + final int limitSteps[] = { + 1, 1, 1, 1, + 2, 2, 2, 2, 2, 2, + 3, 3, 3, 3, 3, 3, 3, 3, + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, + -1 + }; + for (int i = 0; i < limitSteps.length; i++) { + if (LOG.isDebugEnabled()) { + LOG.debug("setting compaction limit to " + limitSteps[i]); + } + compactSplitThread.setLimit(limitSteps[i]); + try { + Thread.sleep(compactionCheckInterval); + } catch (InterruptedException ex) { + // unlimit compactions before exiting + compactSplitThread.setLimit(-1); + if (LOG.isDebugEnabled()) { + LOG.debug(this.getName() + " exiting on interrupt"); + } + return; + } + } + LOG.info("compactions no longer limited"); + } + } + /* * Thread to shutdown the region server in an orderly manner. This thread * is registered as a shutdown hook in the HRegionServer constructor and is @@ -936,6 +998,18 @@ this.infoServer.setAttribute("regionserver", this); this.infoServer.start(); } + + // Set up the safe mode handler if safe mode has been configured. + if (conf.getInt("hbase.regionserver.safemode.period", 0) < 1) { + safeMode.set(false); + compactSplitThread.setLimit(-1); + LOG.debug("skipping safe mode"); + } else { + this.safeModeThread = new SafeModeThread(); + Threads.setDaemonThreadRunning(this.safeModeThread, n + ".safeMode", + handler); + } + // Start Server. This service is like leases in that it internally runs // a thread. this.server.start(); @@ -1841,8 +1915,15 @@ public boolean isStopRequested() { return stopRequested.get(); } - + /** + * @return true if the region server is in safe mode + */ + public boolean isInSafeMode() { + return safeMode.get(); + } + + /** * * @return the configuration */ Index: src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 729452) +++ src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy) @@ -60,7 +60,9 @@ new LinkedBlockingQueue(); private final HashSet regionsInQueue = new HashSet(); - + + private volatile int limit = 1; + /** @param server */ public CompactSplitThread(HRegionServer server) { super(); @@ -73,9 +75,25 @@ @Override public void run() { + while (!this.server.isStopRequested() && this.server.isInSafeMode()) { + try { + Thread.sleep(this.frequency); + } catch (InterruptedException ex) { + continue; + } + } + int count = 0; while (!this.server.isStopRequested()) { HRegion r = null; try { + if ((limit > 0) && (++count > limit)) { + try { + Thread.sleep(this.frequency); + } catch (InterruptedException ex) { + continue; + } + count = 0; + } r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); if (r != null && !this.server.isStopRequested()) { synchronized (regionsInQueue) { @@ -195,8 +213,16 @@ // Do not serve the new regions. Let the Master assign them. } - + /** + * Sets the number of compactions allowed per cycle. + * @param limit the number of compactions allowed, or -1 to unlimit + */ + void setLimit(int limit) { + this.limit = limit; + } + + /** * Only interrupt once it's done with a run through the work loop. */ void interruptIfNecessary() { Index: src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (revision 729452) +++ src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (working copy) @@ -120,6 +120,13 @@ @Override public void run() { + while (!this.server.isStopRequested() && this.server.isInSafeMode()) { + try { + Thread.sleep(threadWakeFrequency); + } catch (InterruptedException ex) { + continue; + } + } while (!server.isStopRequested()) { HRegion r = null; try {