diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index fb15ce4..dd0cf45 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -141,6 +141,8 @@ public class ReplicationSource extends Thread private MetricsSource metrics; // Handle on the log reader helper private ReplicationHLogReaderManager repLogReader; + // WARN threshold for the number of queued logs, defaults to 2 + private int logQueueWarnThreshold; /** * Instantiation method used by region servers @@ -189,6 +191,7 @@ public class ReplicationSource extends Thread this.fs = fs; this.metrics = new MetricsSource(peerClusterZnode); this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf); + this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); try { this.clusterId = ZKClusterId.getUUIDForCluster(zkHelper.getZookeeperWatcher()); } catch (KeeperException ke) { @@ -225,7 +228,14 @@ public class ReplicationSource extends Thread @Override public void enqueueLog(Path log) { this.queue.put(log); - this.metrics.setSizeOfLogQueue(queue.size()); + + int queueSize = queue.size(); + this.metrics.setSizeOfLogQueue(queueSize); + + // This will log a warning for each new log that gets created above the warn threshold + if (queueSize > logQueueWarnThreshold) { + LOG.warn("Queue size: " + queueSize + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold); + } } @Override