Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1170377) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -393,7 +393,8 @@ private void connectToPeers() { // Connect to peer cluster first, unless we have to stop - while (!this.stopper.isStopped() && this.currentPeers.size() == 0) { + while (this.isActive() && this.currentPeers.size() == 0) { + try { chooseSinks(); Thread.sleep(this.sleepForRetries); @@ -552,7 +553,7 @@ LOG.warn("Was given 0 edits to ship"); return; } - while (!this.stopper.isStopped()) { + while (this.isActive()) { try { HRegionInterface rrs = getRS(); LOG.debug("Replicating " + currentNbEntries); @@ -579,6 +580,7 @@ } try { boolean down; + // Spin while the slave is down and we're not asked to shutdown/close do { down = isSlaveDown(); if (down) { @@ -588,7 +590,7 @@ chooseSinks(); } } - } while (!this.stopper.isStopped() && down); + } while (this.isActive() && down ); } catch (InterruptedException e) { LOG.debug("Interrupted while trying to contact the peer cluster"); } catch (KeeperException e) { @@ -708,6 +710,10 @@ this.sourceEnabled.set(status); } + private boolean isActive() { + return !this.stopper.isStopped() && this.running; + } + /** * Comparator used to compare logs together based on their start time */