Index: conf/hbase-default.xml =================================================================== --- conf/hbase-default.xml (revision 729358) +++ conf/hbase-default.xml (working copy) @@ -225,6 +225,12 @@ + hbase.regionserver.safemode.period + 120000 + Time to wait on regionserver startup before beginning + compactions and memcache flushes. + + hbase.hregion.memcache.flush.size 67108864 Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 729358) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -117,7 +117,9 @@ protected final AtomicBoolean stopRequested = new AtomicBoolean(false); protected final AtomicBoolean quiesced = new AtomicBoolean(false); - + + protected final AtomicBoolean safeMode = new AtomicBoolean(true); + // Go down hard. Used if file system becomes unavailable and also in // debugging and unit tests. protected volatile boolean abortRequested; @@ -433,8 +435,8 @@ checkFileSystem(); } if (this.stopRequested.get()) { - LOG.info("Stop was requested, clearing the toDo " + - "despite of the exception"); + LOG.info("Stop was requested, clearing the toDo " + + "despite of the exception"); toDo.clear(); continue; } @@ -588,6 +590,14 @@ this.logFlusher.setHLog(log); // Init in here rather than in constructor after thread name has been set this.metrics = new RegionServerMetrics(); + // start thread for turning off safemode + if (conf.getInt("hbase.regionserver.safemode.period", 0) < 1) { + safeMode.set(false); + compactSplitThread.setLimit(-1); + LOG.debug("skipping safe mode"); + } else { + new SafemodeThread().start(); + } startServiceThreads(); isOnline = true; } catch (Throwable e) { @@ -717,7 +727,70 @@ return this.fsOk; } + /** + * Thread for toggling safemode after some configurable interval. + */ + private class SafemodeThread extends Thread { + + public void start() { + // make this thread a daemon so it will not delay any shutdown + this.setDaemon(true); + super.start(); + } + + public void run() { + // first, wait the required interval before turning off safemode + int safemodeInterval = + conf.getInt("hbase.regionserver.safemode.period", 120 * 1000); + try { + Thread.sleep(safemodeInterval); + } catch (InterruptedException ex) { + // turn off safemode and limits on the way out due to some kind of + // abnormal condition so we do not prevent such things as memcache + // flushes and worsen the situation + safeMode.set(false); + compactSplitThread.setLimit(-1); + if (LOG.isDebugEnabled()) { + LOG.debug(this.getName() + " exiting on interrupt"); + } + return; + } + LOG.info("leaving safe mode"); + safeMode.set(false); + + // now that safemode is off, slowly increase the per-cycle compaction + // limit, finally setting it to unlimited (-1) + int compactionCheckInterval = + conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency", + 20 * 1000); + final int limitSteps[] = { + 1, 1, 1, 1, + 2, 2, 2, 2, 2, 2, + 3, 3, 3, 3, 3, 3, 3, 3, + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, + -1 + }; + for (int i = 0; i < limitSteps.length; i++) { + if (LOG.isDebugEnabled()) { + LOG.debug("setting compaction limit to " + limitSteps[i]); + } + compactSplitThread.setLimit(limitSteps[i]); + try { + Thread.sleep(compactionCheckInterval); + } catch (InterruptedException ex) { + // unlimit compactions before exiting + compactSplitThread.setLimit(-1); + if (LOG.isDebugEnabled()) { + LOG.debug(this.getName() + " exiting on interrupt"); + } + return; + } + } + LOG.info("compactions no longer limited"); + } + } + /* * Thread to shutdown the region server in an orderly manner. This thread * is registered as a shutdown hook in the HRegionServer constructor and is @@ -1841,8 +1914,15 @@ public boolean isStopRequested() { return stopRequested.get(); } - + /** + * @return true if the region server is in safe mode + */ + public boolean isInSafeMode() { + return safeMode.get(); + } + + /** * * @return the configuration */ Index: src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 729358) +++ src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy) @@ -60,7 +60,9 @@ new LinkedBlockingQueue(); private final HashSet regionsInQueue = new HashSet(); - + + private int limit = 1; + /** @param server */ public CompactSplitThread(HRegionServer server) { super(); @@ -73,9 +75,26 @@ @Override public void run() { + while (!this.server.isStopRequested() && this.server.isInSafeMode()) { + try { + LOG.debug("in safe mode, deferring compactions"); + Thread.sleep(this.frequency); + } catch (InterruptedException ex) { + continue; + } + } + int count = 0; while (!this.server.isStopRequested()) { HRegion r = null; try { + if ((limit > 0) && (++count > limit)) { + try { + Thread.sleep(this.frequency); + } catch (InterruptedException ex) { + continue; + } + count = 0; + } r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); if (r != null && !this.server.isStopRequested()) { synchronized (regionsInQueue) { @@ -195,8 +214,16 @@ // Do not serve the new regions. Let the Master assign them. } - + /** + * Sets the number of compactions allowed per cycle. + * @param limit the number of compactions allowed, or -1 to unlimit + */ + void setLimit(int limit) { + this.limit = limit; + } + + /** * Only interrupt once it's done with a run through the work loop. */ void interruptIfNecessary() { Index: src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (revision 729358) +++ src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (working copy) @@ -120,7 +120,15 @@ @Override public void run() { + while (!this.server.isStopRequested() && this.server.isInSafeMode()) { + try { + LOG.debug("in safe mode, deferring memcache flushes"); + Thread.sleep(threadWakeFrequency); + } catch (InterruptedException ex) { + continue; + } + } while (!server.isStopRequested()) { HRegion r = null; try {