Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1178996) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -354,6 +354,7 @@ LOG.info("Closing the following queue " + id + ", currently have " + sources.size() + " and another " + oldsources.size() + " that were recovered"); + String terminateMessage = "Replication stream was removed by a user"; ReplicationSourceInterface srcToRemove = null; List oldSourcesToDelete = new ArrayList(); @@ -364,6 +365,7 @@ } } for (ReplicationSourceInterface src : oldSourcesToDelete) { + src.terminate(terminateMessage); closeRecoveredQueue((src)); } LOG.info("Number of deleted recovered sources for " + id + ": " @@ -379,7 +381,7 @@ LOG.error("The queue we wanted to close is missing " + id); return; } - srcToRemove.terminate("Replication stream was removed by a user"); + srcToRemove.terminate(terminateMessage); this.sources.remove(srcToRemove); this.zkHelper.deleteSource(id, true); }