Index: conf/hbase-default.xml
===================================================================
--- conf/hbase-default.xml (revision 729358)
+++ conf/hbase-default.xml (working copy)
@@ -225,6 +225,12 @@
+ hbase.regionserver.safemode.period
+ 120000
+ Time to wait on regionserver startup before beginning
+ compactions and memcache flushes.
+
+
hbase.hregion.memcache.flush.size
67108864
Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 729358)
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -117,7 +117,9 @@
protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
protected final AtomicBoolean quiesced = new AtomicBoolean(false);
-
+
+ protected final AtomicBoolean safeMode = new AtomicBoolean(true);
+
// Go down hard. Used if file system becomes unavailable and also in
// debugging and unit tests.
protected volatile boolean abortRequested;
@@ -433,8 +435,8 @@
checkFileSystem();
}
if (this.stopRequested.get()) {
- LOG.info("Stop was requested, clearing the toDo " +
- "despite of the exception");
+ LOG.info("Stop was requested, clearing the toDo " +
+ "despite of the exception");
toDo.clear();
continue;
}
@@ -588,6 +590,14 @@
this.logFlusher.setHLog(log);
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
+ // start thread for turning off safemode
+ if (conf.getInt("hbase.regionserver.safemode.period", 0) < 1) {
+ safeMode.set(false);
+ compactSplitThread.setLimit(-1);
+ LOG.debug("skipping safe mode");
+ } else {
+ new SafemodeThread().start();
+ }
startServiceThreads();
isOnline = true;
} catch (Throwable e) {
@@ -717,7 +727,70 @@
return this.fsOk;
}
+ /**
+ * Thread for toggling safemode after some configurable interval.
+ */
+ private class SafemodeThread extends Thread {
+
+ public void start() {
+ // make this thread a daemon so it will not delay any shutdown
+ this.setDaemon(true);
+ super.start();
+ }
+
+ public void run() {
+ // first, wait the required interval before turning off safemode
+ int safemodeInterval =
+ conf.getInt("hbase.regionserver.safemode.period", 120 * 1000);
+ try {
+ Thread.sleep(safemodeInterval);
+ } catch (InterruptedException ex) {
+ // turn off safemode and limits on the way out due to some kind of
+ // abnormal condition so we do not prevent such things as memcache
+ // flushes and worsen the situation
+ safeMode.set(false);
+ compactSplitThread.setLimit(-1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this.getName() + " exiting on interrupt");
+ }
+ return;
+ }
+ LOG.info("leaving safe mode");
+ safeMode.set(false);
+
+ // now that safemode is off, slowly increase the per-cycle compaction
+ // limit, finally setting it to unlimited (-1)
+ int compactionCheckInterval =
+ conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency",
+ 20 * 1000);
+ final int limitSteps[] = {
+ 1, 1, 1, 1,
+ 2, 2, 2, 2, 2, 2,
+ 3, 3, 3, 3, 3, 3, 3, 3,
+ 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
+ -1
+ };
+ for (int i = 0; i < limitSteps.length; i++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("setting compaction limit to " + limitSteps[i]);
+ }
+ compactSplitThread.setLimit(limitSteps[i]);
+ try {
+ Thread.sleep(compactionCheckInterval);
+ } catch (InterruptedException ex) {
+ // unlimit compactions before exiting
+ compactSplitThread.setLimit(-1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this.getName() + " exiting on interrupt");
+ }
+ return;
+ }
+ }
+ LOG.info("compactions no longer limited");
+ }
+ }
+
/*
* Thread to shutdown the region server in an orderly manner. This thread
* is registered as a shutdown hook in the HRegionServer constructor and is
@@ -1841,8 +1914,15 @@
public boolean isStopRequested() {
return stopRequested.get();
}
-
+
/**
+ * @return true if the region server is in safe mode
+ */
+ public boolean isInSafeMode() {
+ return safeMode.get();
+ }
+
+ /**
*
* @return the configuration
*/
Index: src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 729358)
+++ src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy)
@@ -60,7 +60,9 @@
new LinkedBlockingQueue();
private final HashSet regionsInQueue = new HashSet();
-
+
+ private int limit = 1;
+
/** @param server */
public CompactSplitThread(HRegionServer server) {
super();
@@ -73,9 +75,26 @@
@Override
public void run() {
+ while (!this.server.isStopRequested() && this.server.isInSafeMode()) {
+ try {
+ LOG.debug("in safe mode, deferring compactions");
+ Thread.sleep(this.frequency);
+ } catch (InterruptedException ex) {
+ continue;
+ }
+ }
+ int count = 0;
while (!this.server.isStopRequested()) {
HRegion r = null;
try {
+ if ((limit > 0) && (++count > limit)) {
+ try {
+ Thread.sleep(this.frequency);
+ } catch (InterruptedException ex) {
+ continue;
+ }
+ count = 0;
+ }
r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
if (r != null && !this.server.isStopRequested()) {
synchronized (regionsInQueue) {
@@ -195,8 +214,16 @@
// Do not serve the new regions. Let the Master assign them.
}
-
+
/**
+ * Sets the number of compactions allowed per cycle.
+ * @param limit the number of compactions allowed, or -1 to unlimit
+ */
+ void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ /**
* Only interrupt once it's done with a run through the work loop.
*/
void interruptIfNecessary() {
Index: src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (revision 729358)
+++ src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (working copy)
@@ -120,7 +120,15 @@
@Override
public void run() {
+ while (!this.server.isStopRequested() && this.server.isInSafeMode()) {
+ try {
+ LOG.debug("in safe mode, deferring memcache flushes");
+ Thread.sleep(threadWakeFrequency);
+ } catch (InterruptedException ex) {
+ continue;
+ }
+ }
while (!server.isStopRequested()) {
HRegion r = null;
try {