Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (revision 1386908) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (working copy) @@ -47,6 +47,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * This class is responsible for replicating the edits coming @@ -71,6 +72,7 @@ private final ExecutorService sharedThreadPool; private final HConnection sharedHtableCon; private final ReplicationSinkMetrics metrics; + private final AtomicLong totalReplicatedEdits = new AtomicLong(); /** * Create a sink for replication @@ -156,7 +158,7 @@ this.metrics.setAgeOfLastAppliedOp( entries[entries.length-1].getKey().getWriteTime()); this.metrics.appliedBatchesRate.inc(1); - LOG.info("Total replicated: " + totalReplicated); + this.totalReplicatedEdits.addAndGet(totalReplicated); } catch (IOException ex) { LOG.error("Unable to accept edit because:", ex); throw ex; @@ -224,4 +226,15 @@ } } } + + /** + * Get a string representation of this sink's metrics + * @return string with the total replicated edits count and the date + * of the last edit that was applied + */ + public String getStats() { + return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " + + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + + ", total replicated edits: " + this.totalReplicatedEdits; + } } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (revision 1386908) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (working copy) @@ -93,4 +93,11 @@ */ public String getPeerClusterId(); + /** + * Get a string representation of the current statistics + * for this source + * @return printable stats + */ + public String getStats(); + } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1386908) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -432,9 +432,6 @@ break; } } - LOG.debug("currentNbOperations:" + currentNbOperations + - " and seenEntries:" + seenEntries + - " and size: " + (this.reader.getPosition() - this.position)); // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure return seenEntries == 0 && processEndOfFile(); @@ -477,8 +474,6 @@ */ protected boolean openReader(int sleepMultiplier) { try { - LOG.debug("Opening log for replication " + this.currentPath.getName() + - " at " + this.position); try { this.reader = null; this.reader = HLog.getReader(this.fs, this.currentPath, this.conf); @@ -613,7 +608,6 @@ } try { HRegionInterface rrs = getRS(); - LOG.debug("Replicating " + currentNbEntries); rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries)); if (this.lastLoggedPosition != this.position) { this.manager.logPositionAndCleanOldLogs(this.currentPath, @@ -626,7 +620,6 @@ this.currentNbOperations); this.metrics.setAgeOfLastShippedOp( this.entriesArray[currentNbEntries-1].getKey().getWriteTime()); - LOG.debug("Replicated in total: " + this.totalReplicatedEdits); break; } catch (IOException ioe) { @@ -817,4 +810,11 @@ return Long.parseLong(parts[parts.length-1]); } } + + @Override + public String getStats() { + return "Total replicated edits: " + totalReplicatedEdits + + ", currently replicating from: " + this.currentPath + + " at position: " + this.position; + } } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1386908) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -22,8 +22,14 @@ import java.io.IOException; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -51,6 +57,8 @@ */ public class Replication implements WALActionsListener, ReplicationSourceService, ReplicationSinkService { + private static final Log LOG = + LogFactory.getLog(Replication.class); private boolean replication; private ReplicationSourceManager replicationManager; private final AtomicBoolean replicating = new AtomicBoolean(true); @@ -59,6 +67,14 @@ private ReplicationSink replicationSink; // Hosting server private Server server; + /** Statistics thread schedule pool */ + private final ScheduledExecutorService scheduleThreadPool = + Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setNameFormat("Replication Statistics #%d") + .setDaemon(true) + .build()); + private int statsThreadPeriod; /** * Instantiate the replication management (if rep is enabled). @@ -93,6 +109,10 @@ } this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, this.replicating, logDir, oldLogDir) ; + + this.statsThreadPeriod = + this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); + LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); } else { this.replicationManager = null; this.zkHelper = null; @@ -150,6 +170,9 @@ if (this.replication) { this.replicationManager.init(); this.replicationSink = new ReplicationSink(this.conf, this.server); + this.scheduleThreadPool.scheduleAtFixedRate( + new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), + statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); } } @@ -231,4 +254,32 @@ public void logCloseRequested() { // not interested } + + /* + * Statistics thread. Periodically prints the cache statistics to the log. + */ + static class ReplicationStatisticsThread extends Thread { + + private final ReplicationSink replicationSink; + private final ReplicationSourceManager replicationManager; + + public ReplicationStatisticsThread(final ReplicationSink replicationSink, + final ReplicationSourceManager replicationManager) { + super("ReplicationStatisticsThread"); + this.replicationManager = replicationManager; + this.replicationSink = replicationSink; + } + + @Override + public void run() { + printStats(this.replicationManager.getStats()); + printStats(this.replicationSink.getStats()); + } + + private void printStats(String stats) { + if (!stats.isEmpty()) { + LOG.info(stats); + } + } + } } Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (revision 1386908) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (working copy) @@ -105,6 +105,7 @@ conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf1.setBoolean("dfs.support.append", true); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf1.setInt("replication.stats.thread.period.seconds", 5); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); Index: src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (revision 1386908) +++ src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (working copy) @@ -81,4 +81,9 @@ public String getPeerClusterId() { return peerClusterId; } + + @Override + public String getStats() { + return ""; + } } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java (revision 1386908) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java (working copy) @@ -36,6 +36,7 @@ private final MetricsRecord metricsRecord; private MetricsRegistry registry = new MetricsRegistry(); private static ReplicationSinkMetrics instance; + private long lastTimestampForAge = System.currentTimeMillis(); /** Rate of operations applied by the sink */ public final MetricsRate appliedOpsRate = @@ -67,11 +68,24 @@ * @param timestamp write time of the edit */ public void setAgeOfLastAppliedOp(long timestamp) { - ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp); + lastTimestampForAge = timestamp; + ageOfLastAppliedOp.set(System.currentTimeMillis() - lastTimestampForAge); } + + /** + * Refreshing the age makes sure the value returned is the actual one and + * not the one set a replication time + * @return refreshed age + */ + public long refreshAgeOfLastAppliedOp() { + setAgeOfLastAppliedOp(lastTimestampForAge); + return ageOfLastAppliedOp.get(); + } + @Override public void doUpdates(MetricsContext metricsContext) { synchronized (this) { + refreshAgeOfLastAppliedOp(); this.appliedOpsRate.pushMetric(this.metricsRecord); this.appliedBatchesRate.pushMetric(this.metricsRecord); this.ageOfLastAppliedOp.pushMetric(this.metricsRecord);