diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index fb15ce4..b2cd19d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -284,6 +284,10 @@ public class ReplicationSource extends Thread sleepMultiplier++; } continue; + } else if (oldPath != null && !oldPath.getName().equals(this.currentPath.getName())) { + this.manager.cleanOldLogs(this.currentPath.getName(), + this.peerId, + this.replicationQueueInfo.isQueueRecovered()); } boolean currentWALisBeingWrittenTo = false; //For WAL files we own (rather than recovered), take a snapshot of whether the diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 8f9ad65..0014069 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -160,15 +160,31 @@ public class ReplicationSourceManager { if (holdLogInZK) { return; } + cleanOldLogs(key, id, queueRecovered); + } + + /** + * Cleans a log file and all older files from ZK. Called when we are sure that a + * log file is closed and has no more entries. + * @param key Path to the log + * @param id id of the peer cluster + * @param queueRecovered Whether this is a recovered queue + */ + public void cleanOldLogs(String key, + String id, + boolean queueRecovered) { synchronized (this.hlogsById) { SortedSet hlogs = this.hlogsById.get(id); - if (!queueRecovered && !hlogs.first().equals(key)) { - SortedSet hlogSet = hlogs.headSet(key); - for (String hlog : hlogSet) { - this.zkHelper.removeLogFromList(hlog, id); - } - hlogSet.clear(); + if (queueRecovered || hlogs.first().equals(key)) { + return; + } + SortedSet hlogSet = hlogs.headSet(key); + LOG.info("Removing " + hlogSet.size() + + " logs in the list: " + hlogSet); + for (String hlog : hlogSet) { + this.zkHelper.removeLogFromList(hlog, id); } + hlogSet.clear(); } }