Index: conf/hbase-default.xml
===================================================================
--- conf/hbase-default.xml (revision 729452)
+++ conf/hbase-default.xml (working copy)
@@ -225,6 +225,12 @@
+ hbase.regionserver.safemode.period
+ 120000
+ Time to wait on regionserver startup before beginning
+ compactions and memcache flushes.
+
+
hbase.hregion.memcache.flush.size
67108864
Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 729452)
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -117,7 +117,9 @@
protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
protected final AtomicBoolean quiesced = new AtomicBoolean(false);
-
+
+ protected final AtomicBoolean safeMode = new AtomicBoolean(true);
+
// Go down hard. Used if file system becomes unavailable and also in
// debugging and unit tests.
protected volatile boolean abortRequested;
@@ -197,6 +199,9 @@
final LogRoller logRoller;
final LogFlusher logFlusher;
+ // safemode processing
+ SafeModeThread safeModeThread;
+
// flag set after we're done setting up server threads (used for testing)
protected volatile boolean isOnline;
@@ -717,6 +722,63 @@
return this.fsOk;
}
+ /**
+ * Thread for toggling safemode after some configurable interval.
+ */
+ private class SafeModeThread extends Thread {
+
+ public void run() {
+ // first, wait the required interval before turning off safemode
+ int safemodeInterval =
+ conf.getInt("hbase.regionserver.safemode.period", 120 * 1000);
+ try {
+ Thread.sleep(safemodeInterval);
+ } catch (InterruptedException ex) {
+ // turn off safemode and limits on the way out due to some kind of
+ // abnormal condition so we do not prevent such things as memcache
+ // flushes and worsen the situation
+ safeMode.set(false);
+ compactSplitThread.setLimit(-1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this.getName() + " exiting on interrupt");
+ }
+ return;
+ }
+ LOG.info("leaving safe mode");
+ safeMode.set(false);
+
+ // now that safemode is off, slowly increase the per-cycle compaction
+ // limit, finally setting it to unlimited (-1)
+ int compactionCheckInterval =
+ conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency",
+ 20 * 1000);
+ final int limitSteps[] = {
+ 1, 1, 1, 1,
+ 2, 2, 2, 2, 2, 2,
+ 3, 3, 3, 3, 3, 3, 3, 3,
+ 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
+ -1
+ };
+ for (int i = 0; i < limitSteps.length; i++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("setting compaction limit to " + limitSteps[i]);
+ }
+ compactSplitThread.setLimit(limitSteps[i]);
+ try {
+ Thread.sleep(compactionCheckInterval);
+ } catch (InterruptedException ex) {
+ // unlimit compactions before exiting
+ compactSplitThread.setLimit(-1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this.getName() + " exiting on interrupt");
+ }
+ return;
+ }
+ }
+ LOG.info("compactions no longer limited");
+ }
+ }
+
/*
* Thread to shutdown the region server in an orderly manner. This thread
* is registered as a shutdown hook in the HRegionServer constructor and is
@@ -936,6 +998,18 @@
this.infoServer.setAttribute("regionserver", this);
this.infoServer.start();
}
+
+ // Set up the safe mode handler if safe mode has been configured.
+ if (conf.getInt("hbase.regionserver.safemode.period", 0) < 1) {
+ safeMode.set(false);
+ compactSplitThread.setLimit(-1);
+ LOG.debug("skipping safe mode");
+ } else {
+ this.safeModeThread = new SafeModeThread();
+ Threads.setDaemonThreadRunning(this.safeModeThread, n + ".safeMode",
+ handler);
+ }
+
// Start Server. This service is like leases in that it internally runs
// a thread.
this.server.start();
@@ -1841,8 +1915,15 @@
public boolean isStopRequested() {
return stopRequested.get();
}
-
+
/**
+ * @return true if the region server is in safe mode
+ */
+ public boolean isInSafeMode() {
+ return safeMode.get();
+ }
+
+ /**
*
* @return the configuration
*/
Index: src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 729452)
+++ src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy)
@@ -60,7 +60,9 @@
new LinkedBlockingQueue();
private final HashSet regionsInQueue = new HashSet();
-
+
+ private volatile int limit = 1;
+
/** @param server */
public CompactSplitThread(HRegionServer server) {
super();
@@ -73,9 +75,25 @@
@Override
public void run() {
+ while (!this.server.isStopRequested() && this.server.isInSafeMode()) {
+ try {
+ Thread.sleep(this.frequency);
+ } catch (InterruptedException ex) {
+ continue;
+ }
+ }
+ int count = 0;
while (!this.server.isStopRequested()) {
HRegion r = null;
try {
+ if ((limit > 0) && (++count > limit)) {
+ try {
+ Thread.sleep(this.frequency);
+ } catch (InterruptedException ex) {
+ continue;
+ }
+ count = 0;
+ }
r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
if (r != null && !this.server.isStopRequested()) {
synchronized (regionsInQueue) {
@@ -195,8 +213,16 @@
// Do not serve the new regions. Let the Master assign them.
}
-
+
/**
+ * Sets the number of compactions allowed per cycle.
+ * @param limit the number of compactions allowed, or -1 to unlimit
+ */
+ void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ /**
* Only interrupt once it's done with a run through the work loop.
*/
void interruptIfNecessary() {
Index: src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (revision 729452)
+++ src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (working copy)
@@ -120,6 +120,13 @@
@Override
public void run() {
+ while (!this.server.isStopRequested() && this.server.isInSafeMode()) {
+ try {
+ Thread.sleep(threadWakeFrequency);
+ } catch (InterruptedException ex) {
+ continue;
+ }
+ }
while (!server.isStopRequested()) {
HRegion r = null;
try {