diff --git src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index c00f058..69083eb 100644 --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -782,6 +782,13 @@ public class ReplicationSource extends Thread */ protected boolean processEndOfFile() { if (this.queue.size() != 0) { + // We only dequeue another WAL if currentPath is null and this is the only + // place where it is being set of null. We are here if either an error + // happened and we gave up on reading the WAL or a new WAL was enqueued + // and we had reached the end of the current WAL. + if (!this.queueRecovered) { + this.manager.cleanOldLogs(currentPath, peerId); + } this.currentPath = null; this.repLogReader.finishCurrentFile(); this.reader = null; diff --git src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 51d0e61..597483d 100644 --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -176,6 +176,26 @@ public class ReplicationSourceManager { } /** + * Cleans a log file and all older files from ZK. Called when we are sure that a + * log file is closed and has no more entries. + * @param log Path to the log + * @param id id of the peer cluster + */ + public void cleanOldLogs(Path log, String id) { + String key = log.getName(); + LOG.info("Cleaning logs older than " + key); + SortedSet hlogs = hlogsById.get(id); + SortedSet hlogSet = hlogs.headSet(key); + for (String hlog : hlogSet) { + this.zkHelper.removeLogFromList(hlog, id); + } + hlogSet.clear(); + this.zkHelper.removeLogFromList(key, id); + hlogs.remove(key); + LOG.info("Removed logs from zookeeper : " + hlogSet + "," + key); + } + + /** * Adds a normal source per registered peer cluster and tries to process all * old region server hlog queues */