Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1171280) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -230,7 +230,7 @@ public void run() { connectToPeers(); // We were stopped while looping to connect to sinks, just abort - if (this.stopper.isStopped()) { + if (!this.isActive()) { return; } // If this is recovered, the queue is already full and the first log @@ -246,7 +246,7 @@ } int sleepMultiplier = 1; // Loop until we close down - while (!stopper.isStopped() && this.running) { + while (isActive()) { // Sleep until replication is enabled again if (!this.replicating.get() || !this.sourceEnabled.get()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { @@ -329,7 +329,7 @@ // If we didn't get anything to replicate, or if we hit a IOE, // wait a bit and retry. // But if we need to stop, don't bother sleeping - if (!stopper.isStopped() && (gotIOE || currentNbEntries == 0)) { + if (this.isActive() && (gotIOE || currentNbEntries == 0)) { this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.position, queueRecovered); if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { @@ -393,7 +393,8 @@ private void connectToPeers() { // Connect to peer cluster first, unless we have to stop - while (!this.stopper.isStopped() && this.currentPeers.size() == 0) { + while (this.isActive() && this.currentPeers.size() == 0) { + try { chooseSinks(); Thread.sleep(this.sleepForRetries); @@ -552,7 +553,7 @@ LOG.warn("Was given 0 edits to ship"); return; } - while (!this.stopper.isStopped()) { + while (this.isActive()) { try { HRegionInterface rrs = getRS(); LOG.debug("Replicating " + currentNbEntries); @@ -579,6 +580,7 @@ } try { boolean down; + // Spin while the slave is down and we're not asked to shutdown/close do { down = isSlaveDown(); if (down) { @@ -588,7 +590,7 @@ chooseSinks(); } } - } while (!this.stopper.isStopped() && down); + } while (this.isActive() && down ); } catch (InterruptedException e) { LOG.debug("Interrupted while trying to contact the peer cluster"); } catch (KeeperException e) { @@ -708,6 +710,10 @@ this.sourceEnabled.set(status); } + private boolean isActive() { + return !this.stopper.isStopped() && this.running; + } + /** * Comparator used to compare logs together based on their start time */