Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1170293) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -242,7 +242,7 @@ public void run() { connectToPeers(); // We were stopped while looping to connect to sinks, just abort - if (this.stopper.isStopped()) { + if (!shouldRun()) { return; } // delay this until we are in an asynchronous thread @@ -267,7 +267,7 @@ } int sleepMultiplier = 1; // Loop until we close down - while (!stopper.isStopped() && this.running) { + while (shouldRun()) { // Sleep until replication is enabled again if (!this.replicating.get() || !this.sourceEnabled.get()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { @@ -350,7 +350,7 @@ // If we didn't get anything to replicate, or if we hit a IOE, // wait a bit and retry. // But if we need to stop, don't bother sleeping - if (!stopper.isStopped() && (gotIOE || currentNbEntries == 0)) { + if (shouldRun() && (gotIOE || currentNbEntries == 0)) { this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.position, queueRecovered); if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { @@ -430,7 +430,7 @@ private void connectToPeers() { // Connect to peer cluster first, unless we have to stop - while (!this.stopper.isStopped() && this.currentPeers.size() == 0) { + while (shouldRun() && this.currentPeers.size() == 0) { try { chooseSinks(); Thread.sleep(this.sleepForRetries); @@ -562,6 +562,13 @@ } /** + * @return true if this Source should continued to run + */ + private boolean shouldRun() { + return !stopper.isStopped() && running; + } + + /** * Count the number of different row keys in the given edit because of * mini-batching. We assume that there's at least one KV in the WALEdit. * @param edit edit to count row keys from @@ -588,7 +595,7 @@ LOG.warn("Was given 0 edits to ship"); return; } - while (!this.stopper.isStopped()) { + while (shouldRun()) { try { HRegionInterface rrs = getRS(); LOG.debug("Replicating " + currentNbEntries); @@ -624,7 +631,7 @@ chooseSinks(); } } - } while (!this.stopper.isStopped() && down); + } while (shouldRun() && down); } catch (InterruptedException e) { LOG.debug("Interrupted while trying to contact the peer cluster"); } catch (KeeperException e) {