diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c865490..c567255 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -282,6 +282,10 @@ public class ReplicationSource extends Thread sleepMultiplier++; } continue; + } else if (oldPath != null && !oldPath.getName().equals(this.currentPath.getName())) { + this.manager.cleanOldLogs(this.currentPath.getName(), + this.peerId, + this.replicationQueueInfo.isQueueRecovered()); } boolean currentWALisBeingWrittenTo = false; //For WAL files we own (rather than recovered), take a snapshot of whether the diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index ba70552..f33e3a8 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -155,15 +155,29 @@ public class ReplicationSourceManager { if (holdLogInZK) { return; } + cleanOldLogs(key, id, queueRecovered); + } + + /** + * Cleans a log file and all older files from ZK. Called when we are sure that a + * log file is closed and has no more entries. + * @param key Path to the log + * @param id id of the peer cluster + * @param queueRecovered Whether this is a recovered queue + */ + public void cleanOldLogs(String key, + String id, + boolean queueRecovered) { synchronized (this.hlogsById) { SortedSet hlogs = this.hlogsById.get(id); - if (!queueRecovered && !hlogs.first().equals(key)) { - SortedSet hlogSet = hlogs.headSet(key); - for (String hlog : hlogSet) { - this.zkHelper.removeLogFromList(hlog, id); - } - hlogSet.clear(); + if (queueRecovered || hlogs.first().equals(key)) { + return; + } + SortedSet hlogSet = hlogs.headSet(key); + for (String hlog : hlogSet) { + this.zkHelper.removeLogFromList(hlog, id); } + hlogSet.clear(); } }