diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 7973a40..b4ba497 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -131,6 +131,8 @@ public class ReplicationSource extends Thread private ReplicationHLogReaderManager repLogReader; // Handles connecting to peer region servers private ReplicationSinkManager replicationSinkMgr; + //WARN threshold for the number of queued logs, defaults to 2 + private int logQueueWarnThreshold; /** * Instantiation method used by region servers @@ -182,13 +184,19 @@ public class ReplicationSource extends Thread // ReplicationQueueInfo parses the peerId out of the znode for us this.peerId = this.replicationQueueInfo.getPeerId(); this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf); + this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); } @Override public void enqueueLog(Path log) { this.queue.put(log); - this.metrics.setSizeOfLogQueue(queue.size()); + int queueSize = queue.size(); + this.metrics.setSizeOfLogQueue(queueSize); + // This will log a warning for each new log that gets created above the warn threshold + if (queueSize > this.logQueueWarnThreshold) { + LOG.warn("Queue size: " + queueSize + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); + } } @Override