Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1504364) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -352,6 +352,8 @@ sleepMultiplier++; } continue; + } else if (oldPath != null && !oldPath.getName().equals(getCurrentPath().getName())) { + this.manager.cleanOldLogs(getCurrentPath().getName(), this.peerId, this.queueRecovered); } boolean currentWALisBeingWrittenTo = false; //For WAL files we own (rather than recovered), take a snapshot of whether the Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1504364) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -161,17 +161,29 @@ if (holdLogInZK) { return; } + cleanOldLogs(key, id, queueRecovered); + } + + /** + * Cleans a log file and all older files from ZK. Called when we are sure that a + * log file is closed and has no more entries. + * @param key Path to the log + * @param id id of the peer cluster + * @param queueRecovered Whether this is a recovered queue + */ + public void cleanOldLogs(String key, + String id, + boolean queueRecovered) { synchronized (this.hlogsById) { SortedSet hlogs = this.hlogsById.get(id); - if (!queueRecovered && hlogs.first() != key) { - SortedSet hlogSet = hlogs.headSet(key); - LOG.info("Removing " + hlogSet.size() + - " logs in the list: " + hlogSet); - for (String hlog : hlogSet) { - this.zkHelper.removeLogFromList(hlog, id); - } - hlogSet.clear(); + if (queueRecovered || hlogs.first().equals(key)) { + return; } + SortedSet hlogSet = hlogs.headSet(key); + for (String hlog : hlogSet) { + this.zkHelper.removeLogFromList(hlog, id); + } + hlogSet.clear(); } }