diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 1281993..e5b2f7a 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -22,8 +22,14 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -51,6 +57,8 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; */ public class Replication implements WALActionsListener, ReplicationSourceService, ReplicationSinkService { + private static final Log LOG = + LogFactory.getLog(Replication.class); private boolean replication; private ReplicationSourceManager replicationManager; private final AtomicBoolean replicating = new AtomicBoolean(true); @@ -59,6 +67,9 @@ public class Replication implements WALActionsListener, private ReplicationSink replicationSink; // Hosting server private Server server; + /** Statistics thread schedule pool */ + private ScheduledExecutorService scheduleThreadPool; + private int statsThreadPeriod; /** * Instantiate the replication management (if rep is enabled). @@ -84,6 +95,11 @@ public class Replication implements WALActionsListener, this.server = server; this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); + this.scheduleThreadPool = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setNameFormat(server.getServerName() + "Replication Statistics #%d") + .setDaemon(true) + .build()); if (replication) { try { this.zkHelper = new ReplicationZookeeper(server, this.replicating); @@ -93,6 +109,10 @@ public class Replication implements WALActionsListener, } this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, this.replicating, logDir, oldLogDir) ; + + this.statsThreadPeriod = + this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); + LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); } else { this.replicationManager = null; this.zkHelper = null; @@ -150,6 +170,9 @@ public class Replication implements WALActionsListener, if (this.replication) { this.replicationManager.init(); this.replicationSink = new ReplicationSink(this.conf, this.server); + this.scheduleThreadPool.scheduleAtFixedRate( + new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), + statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); } } @@ -231,4 +254,32 @@ public class Replication implements WALActionsListener, public void logCloseRequested() { // not interested } + + /* + * Statistics thread. Periodically prints the cache statistics to the log. + */ + static class ReplicationStatisticsThread extends Thread { + + private final ReplicationSink replicationSink; + private final ReplicationSourceManager replicationManager; + + public ReplicationStatisticsThread(final ReplicationSink replicationSink, + final ReplicationSourceManager replicationManager) { + super("ReplicationStatisticsThread"); + this.replicationManager = replicationManager; + this.replicationSink = replicationSink; + } + + @Override + public void run() { + printStats(this.replicationManager.getStats()); + printStats(this.replicationSink.getStats()); + } + + private void printStats(String stats) { + if (!stats.isEmpty()) { + LOG.info(stats); + } + } + } } diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 41b19ff..18f0c46 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -47,6 +47,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * This class is responsible for replicating the edits coming @@ -71,6 +72,7 @@ public class ReplicationSink { private final ExecutorService sharedThreadPool; private final HConnection sharedHtableCon; private final ReplicationSinkMetrics metrics; + private final AtomicLong totalReplicatedEdits = new AtomicLong(); /** * Create a sink for replication @@ -156,7 +158,7 @@ public class ReplicationSink { this.metrics.setAgeOfLastAppliedOp( entries[entries.length-1].getKey().getWriteTime()); this.metrics.appliedBatchesRate.inc(1); - LOG.info("Total replicated: " + totalReplicated); + this.totalReplicatedEdits.addAndGet(totalReplicated); } catch (IOException ex) { LOG.error("Unable to accept edit because:", ex); throw ex; @@ -224,4 +226,15 @@ public class ReplicationSink { } } } + + /** + * Get a string representation of this sink's metrics + * @return string with the total replicated edits count and the date + * of the last edit that was applied + */ + public String getStats() { + return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " + + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + + ", total replicated edits: " + this.totalReplicatedEdits; + } } diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java index ae14375..c5bfd78 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java @@ -36,6 +36,7 @@ public class ReplicationSinkMetrics implements Updater { private final MetricsRecord metricsRecord; private MetricsRegistry registry = new MetricsRegistry(); private static ReplicationSinkMetrics instance; + private long lastTimestampForAge = System.currentTimeMillis(); /** Rate of operations applied by the sink */ public final MetricsRate appliedOpsRate = @@ -67,11 +68,24 @@ public class ReplicationSinkMetrics implements Updater { * @param timestamp write time of the edit */ public void setAgeOfLastAppliedOp(long timestamp) { - ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp); + lastTimestampForAge = timestamp; + ageOfLastAppliedOp.set(System.currentTimeMillis() - lastTimestampForAge); } + + /** + * Refreshing the age makes sure the value returned is the actual one and + * not the one set a replication time + * @return refreshed age + */ + public long refreshAgeOfLastAppliedOp() { + setAgeOfLastAppliedOp(lastTimestampForAge); + return ageOfLastAppliedOp.get(); + } + @Override public void doUpdates(MetricsContext metricsContext) { synchronized (this) { + refreshAgeOfLastAppliedOp(); this.appliedOpsRate.pushMetric(this.metricsRecord); this.appliedBatchesRate.pushMetric(this.metricsRecord); this.ageOfLastAppliedOp.pushMetric(this.metricsRecord); diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 09b8797..fac42bd 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -450,9 +450,6 @@ public class ReplicationSource extends Thread break; } } - LOG.debug("currentNbOperations:" + currentNbOperations + - " and seenEntries:" + seenEntries + - " and size: " + (this.reader.getPosition() - startPosition)); if (currentWALisBeingWrittenTo) { return false; } @@ -508,8 +505,6 @@ public class ReplicationSource extends Thread */ protected boolean openReader(int sleepMultiplier) { try { - LOG.debug("Opening log for replication " + this.currentPath.getName() + - " at " + this.position); try { this.reader = null; this.reader = HLog.getReader(this.fs, this.currentPath, this.conf); @@ -646,7 +641,6 @@ public class ReplicationSource extends Thread } try { HRegionInterface rrs = getRS(); - LOG.debug("Replicating " + currentNbEntries); rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries)); if (this.lastLoggedPosition != this.position) { this.manager.logPositionAndCleanOldLogs(this.currentPath, @@ -659,7 +653,6 @@ public class ReplicationSource extends Thread this.currentNbOperations); this.metrics.setAgeOfLastShippedOp( this.entriesArray[currentNbEntries-1].getKey().getWriteTime()); - LOG.debug("Replicated in total: " + this.totalReplicatedEdits); break; } catch (IOException ioe) { @@ -850,4 +843,11 @@ public class ReplicationSource extends Thread return Long.parseLong(parts[parts.length-1]); } } + + @Override + public String getStats() { + return "Total replicated edits: " + totalReplicatedEdits + + ", currently replicating from: " + this.currentPath + + " at position: " + this.position; + } } diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 6778c43..c645273 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -93,4 +93,11 @@ public interface ReplicationSourceInterface { */ public String getPeerClusterId(); + /** + * Get a string representation of the current statistics + * for this source + * @return printable stats + */ + public String getStats(); + } diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index b67661f..241be40 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -150,7 +150,6 @@ public class ReplicationSourceManager { public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, boolean holdLogInZK) { String key = log.getName(); - LOG.info("Going to report log #" + key + " for position " + position + " in " + log); this.zkHelper.writeReplicationStatus(key, id, position); if (holdLogInZK) { return; @@ -159,8 +158,6 @@ public class ReplicationSourceManager { SortedSet hlogs = this.hlogsById.get(id); if (!queueRecovered && hlogs.first() != key) { SortedSet hlogSet = hlogs.headSet(key); - LOG.info("Removing " + hlogSet.size() + - " logs in the list: " + hlogSet); for (String hlog : hlogSet) { this.zkHelper.removeLogFromList(hlog, id); } @@ -637,4 +634,20 @@ public class ReplicationSourceManager { public FileSystem getFs() { return this.fs; } + + /** + * Get a string representation of all the sources' metrics + */ + public String getStats() { + StringBuffer stats = new StringBuffer(); + for (ReplicationSourceInterface source : sources) { + stats.append("Normal source for cluster " + source.getPeerClusterId() + ": "); + stats.append(source.getStats() + "\n"); + } + for (ReplicationSourceInterface oldSource : oldsources) { + stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": "); + stats.append(oldSource.getStats()+ "\n"); + } + return stats.toString(); + } } diff --git a/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 2daf643..a6a059d 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -81,4 +81,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { public String getPeerClusterId() { return peerClusterId; } + + @Override + public String getStats() { + return ""; + } } diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java index 96eb211..ebfcef5 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java @@ -105,6 +105,7 @@ public class TestReplication { conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf1.setBoolean("dfs.support.append", true); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf1.setInt("replication.stats.thread.period.seconds", 5); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster();