commit 1c599c3abf472c6fe90bbdd441be6bd47eca875c Author: Todd Lipcon Date: Wed Apr 14 12:25:19 2010 -0700 HBASE-2448. Fix triggering of scanners to not actually interrupt potential DFS operations in progress diff --git src/java/org/apache/hadoop/hbase/Chore.java src/java/org/apache/hadoop/hbase/Chore.java index 06a97d7..03c4e8c 100644 --- src/java/org/apache/hadoop/hbase/Chore.java +++ src/java/org/apache/hadoop/hbase/Chore.java @@ -84,6 +84,16 @@ public abstract class Chore extends Thread { } /** + * If the thread is currently sleeping, trigger the core to happen immediately. + * If it's in the middle of its operation, will begin another operation + * immediately after finishing this one. + */ + public void triggerNow() { + LOG.info("triggerNow()", new Exception("trace")); + this.sleeper.skipSleepCycle(); + } + + /** * Override to run a task before we start looping. * @return true if initial chore was successful */ diff --git src/java/org/apache/hadoop/hbase/master/BaseScanner.java src/java/org/apache/hadoop/hbase/master/BaseScanner.java index 74cc303..de9dcca 100644 --- src/java/org/apache/hadoop/hbase/master/BaseScanner.java +++ src/java/org/apache/hadoop/hbase/master/BaseScanner.java @@ -589,9 +589,10 @@ abstract class BaseScanner extends Chore implements HConstants { } /** - * Notify the thread to die at the end of its next run + * Interrupt thread regardless of what it's doing */ - public void interruptIfAlive() { + public void interruptAndStop() { + LOG.info("interruptAndStop()", new Exception("trace")); synchronized(scannerLock){ if (isAlive()) { super.interrupt(); diff --git src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java index 6e44b50..d066739 100644 --- src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java +++ src/java/org/apache/hadoop/hbase/master/ModifyTableMeta.java @@ -73,6 +73,6 @@ class ModifyTableMeta extends TableOperation { updateRegionInfo(server, m.getRegionName(), i); } // kick off a meta scan right away - master.regionManager.metaScannerThread.interrupt(); + master.regionManager.metaScannerThread.triggerNow(); } } diff --git src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java index 20a4492..8a5c82a 100644 --- src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java +++ src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java @@ -100,7 +100,7 @@ class ProcessRegionOpen extends ProcessRegionStatusChange { master.regionManager.putMetaRegionOnline(m); // Interrupting the Meta Scanner sleep so that it can // process regions right away - master.regionManager.metaScannerThread.interrupt(); + master.regionManager.metaScannerThread.triggerNow(); } } // If updated successfully, remove from pending list if the state diff --git src/java/org/apache/hadoop/hbase/master/RegionManager.java src/java/org/apache/hadoop/hbase/master/RegionManager.java index d55658f..a3d6563 100644 --- src/java/org/apache/hadoop/hbase/master/RegionManager.java +++ src/java/org/apache/hadoop/hbase/master/RegionManager.java @@ -596,11 +596,11 @@ class RegionManager implements HConstants { if (LOG.isDebugEnabled()) { LOG.debug("telling root scanner to stop"); } - rootScannerThread.interruptIfAlive(); + rootScannerThread.interruptAndStop(); if (LOG.isDebugEnabled()) { LOG.debug("telling meta scanner to stop"); } - metaScannerThread.interruptIfAlive(); + metaScannerThread.interruptAndStop(); if (LOG.isDebugEnabled()) { LOG.debug("meta and root scanners notified"); } diff --git src/java/org/apache/hadoop/hbase/util/Sleeper.java src/java/org/apache/hadoop/hbase/util/Sleeper.java index d556afe..0e817ff 100644 --- src/java/org/apache/hadoop/hbase/util/Sleeper.java +++ src/java/org/apache/hadoop/hbase/util/Sleeper.java @@ -35,7 +35,10 @@ public class Sleeper { private final int period; private AtomicBoolean stop; private static final long MINIMAL_DELTA_FOR_LOGGING = 10000; - + + private final Object sleepLock = new Object(); + private boolean triggerWake = false; + /** * @param sleep * @param stop @@ -51,6 +54,17 @@ public class Sleeper { public void sleep() { sleep(System.currentTimeMillis()); } + + /** + * If currently asleep, stops sleeping; if not asleep, will skip the next + * sleep cycle. + */ + public void skipSleepCycle() { + synchronized (sleepLock) { + triggerWake = true; + sleepLock.notify(); + } + } /** * Sleep for period adjusted by passed startTime @@ -72,7 +86,10 @@ public class Sleeper { while (waitTime > 0) { long woke = -1; try { - Thread.sleep(waitTime); + synchronized (sleepLock) { + if (triggerWake) break; + sleepLock.wait(waitTime); + } woke = System.currentTimeMillis(); long slept = woke - now; if (slept - this.period > MINIMAL_DELTA_FOR_LOGGING) { @@ -92,5 +109,6 @@ public class Sleeper { woke = (woke == -1)? System.currentTimeMillis(): woke; waitTime = this.period - (woke - startTime); } + triggerWake = false; } }