diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index fb15ce4..a128f25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -726,6 +726,13 @@ public class ReplicationSource extends Thread protected boolean processEndOfFile() { if (this.queue.size() != 0) { this.currentPath = null; + // We only dequeue another WAL if currentPath is null and this is the only + // place where it is being set of null. We are here if either an error + // happened and we gave up on reading the WAL or a new WAL was enqueued + // and we had reached the end of the current WAL. + if (!this.replicationQueueInfo.isQueueRecovered()) { + this.manager.cleanOldLogs(currentPath, peerId); + } this.repLogReader.finishCurrentFile(); this.reader = null; return true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 8f9ad65..82e9bdf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -173,6 +173,26 @@ public class ReplicationSourceManager { } /** + * Cleans a log file and all older files from ZK. Called when we are sure that a + * log file is closed and has no more entries. + * @param log Path to the log + * @param id id of the peer cluster + */ + public void cleanOldLogs(Path log, String id) { + String key = log.getName(); + LOG.info("Cleaning logs older than " + key); + SortedSet hlogs = hlogsById.get(id); + SortedSet hlogSet = hlogs.headSet(key); + for (String hlog : hlogSet) { + this.zkHelper.removeLogFromList(hlog, id); + } + hlogSet.clear(); + this.zkHelper.removeLogFromList(key, id); + hlogs.remove(key); + LOG.info("Removed logs from zookeeper : " + hlogSet + "," + key); + } + + /** * Adds a normal source per registered peer cluster and tries to process all * old region server hlog queues */