diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java index 676fe25..2a715c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java @@ -34,6 +34,7 @@ public class MetricsSink { public static final String SINK_APPLIED_OPS = "sink.appliedOps"; private MetricsReplicationSource rms; + private long lastTimestampForAge = System.currentTimeMillis(); public MetricsSink() { rms = CompatibilitySingletonFactory.getInstance(MetricsReplicationSource.class); @@ -43,10 +44,22 @@ public class MetricsSink { * Set the age of the last applied operation * * @param timestamp The timestamp of the last operation applied. + * @return the age that was set */ - public void setAgeOfLastAppliedOp(long timestamp) { - long age = System.currentTimeMillis() - timestamp; + public long setAgeOfLastAppliedOp(long timestamp) { + lastTimestampForAge = timestamp; + long age = System.currentTimeMillis() - lastTimestampForAge; rms.setGauge(SINK_AGE_OF_LAST_APPLIED_OP, age); + return age; + } + + /** + * Refreshing the age makes sure the value returned is the actual one and + * not the one set a replication time + * @return refreshed age + */ + public long refreshAgeOfLastAppliedOp() { + return setAgeOfLastAppliedOp(lastTimestampForAge); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 7a2c8aa..0d47d31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -21,8 +21,14 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -52,6 +58,8 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; @InterfaceAudience.Private public class Replication implements WALActionsListener, ReplicationSourceService, ReplicationSinkService { + private static final Log LOG = + LogFactory.getLog(Replication.class); private boolean replication; private ReplicationSourceManager replicationManager; private final AtomicBoolean replicating = new AtomicBoolean(true); @@ -60,6 +68,9 @@ public class Replication implements WALActionsListener, private ReplicationSink replicationSink; // Hosting server private Server server; + /** Statistics thread schedule pool */ + private ScheduledExecutorService scheduleThreadPool; + private int statsThreadPeriod; /** * Instantiate the replication management (if rep is enabled). @@ -85,6 +96,11 @@ public class Replication implements WALActionsListener, this.server = server; this.conf = this.server.getConfiguration(); this.replication = isReplication(this.conf); + this.scheduleThreadPool = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setNameFormat(server.getServerName() + "Replication Statistics #%d") + .setDaemon(true) + .build()); if (replication) { try { this.zkHelper = new ReplicationZookeeper(server, this.replicating); @@ -94,6 +110,9 @@ public class Replication implements WALActionsListener, } this.replicationManager = new ReplicationSourceManager(zkHelper, conf, this.server, fs, this.replicating, logDir, oldLogDir); + this.statsThreadPeriod = + this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); + LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); } else { this.replicationManager = null; this.zkHelper = null; @@ -151,6 +170,9 @@ public class Replication implements WALActionsListener, if (this.replication) { this.replicationManager.init(); this.replicationSink = new ReplicationSink(this.conf, this.server); + this.scheduleThreadPool.scheduleAtFixedRate( + new ReplicationStatisticsThread(this.replicationSink, this.replicationManager), + statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS); } } @@ -232,4 +254,32 @@ public class Replication implements WALActionsListener, public void logCloseRequested() { // not interested } + + /* + * Statistics thread. Periodically prints the cache statistics to the log. + */ + static class ReplicationStatisticsThread extends Thread { + + private final ReplicationSink replicationSink; + private final ReplicationSourceManager replicationManager; + + public ReplicationStatisticsThread(final ReplicationSink replicationSink, + final ReplicationSourceManager replicationManager) { + super("ReplicationStatisticsThread"); + this.replicationManager = replicationManager; + this.replicationSink = replicationSink; + } + + @Override + public void run() { + printStats(this.replicationManager.getStats()); + printStats(this.replicationSink.getStats()); + } + + private void printStats(String stats) { + if (!stats.isEmpty()) { + LOG.info(stats); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 34ba020..67d921c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -73,6 +74,7 @@ public class ReplicationSink { private final ExecutorService sharedThreadPool; private final HConnection sharedHtableCon; private final MetricsSink metrics; + private final AtomicLong totalReplicatedEdits = new AtomicLong(); /** * Create a sink for replication @@ -158,7 +160,7 @@ public class ReplicationSink { this.metrics.setAgeOfLastAppliedOp( entries[entries.length-1].getKey().getWriteTime()); this.metrics.applyBatch(entries.length); - LOG.info("Total replicated: " + totalReplicated); + this.totalReplicatedEdits.addAndGet(totalReplicated); } catch (IOException ex) { LOG.error("Unable to accept edit because:", ex); throw ex; @@ -226,4 +228,15 @@ public class ReplicationSink { } } } + + /** + * Get a string representation of this sink's metrics + * @return string with the total replicated edits count and the date + * of the last edit that was applied + */ + public String getStats() { + return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " + + "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + + ", total replicated edits: " + this.totalReplicatedEdits; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 1312f7a..f08738b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -454,9 +454,6 @@ public class ReplicationSource extends Thread break; } } - LOG.debug("currentNbOperations:" + currentNbOperations + - " and seenEntries:" + seenEntries + - " and size: " + (this.reader.getPosition() - startPosition)); if (currentWALisBeingWrittenTo) { return false; } @@ -512,8 +509,6 @@ public class ReplicationSource extends Thread */ protected boolean openReader(int sleepMultiplier) { try { - LOG.debug("Opening log for replication " + this.currentPath.getName() + - " at " + this.position); try { this.reader = null; this.reader = HLogFactory.createReader(this.fs, @@ -651,7 +646,6 @@ public class ReplicationSource extends Thread } try { AdminProtocol rrs = getRS(); - LOG.debug("Replicating " + currentNbEntries); ProtobufUtil.replicateWALEntry(rrs, Arrays.copyOf(this.entriesArray, currentNbEntries)); if (this.lastLoggedPosition != this.position) { @@ -663,7 +657,6 @@ public class ReplicationSource extends Thread this.metrics.shipBatch(this.currentNbOperations); this.metrics.setAgeOfLastShippedOp( this.entriesArray[currentNbEntries-1].getKey().getWriteTime()); - LOG.debug("Replicated in total: " + this.totalReplicatedEdits); break; } catch (IOException ioe) { @@ -846,4 +839,11 @@ public class ReplicationSource extends Thread return Long.parseLong(parts[parts.length-1]); } } + + @Override + public String getStats() { + return "Total replicated edits: " + totalReplicatedEdits + + ", currently replicating from: " + this.currentPath + + " at position: " + this.position; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 9b54816..ff011b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -94,4 +94,11 @@ public interface ReplicationSourceInterface { */ public String getPeerClusterId(); + /** + * Get a string representation of the current statistics + * for this source + * @return printable stats + */ + public String getStats(); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 85c30a5..b596f73 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -151,7 +151,6 @@ public class ReplicationSourceManager { public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, boolean holdLogInZK) { String key = log.getName(); - LOG.info("Going to report log #" + key + " for position " + position + " in " + log); this.zkHelper.writeReplicationStatus(key, id, position); if (holdLogInZK) { return; @@ -160,8 +159,6 @@ public class ReplicationSourceManager { SortedSet hlogs = this.hlogsById.get(id); if (!queueRecovered && hlogs.first() != key) { SortedSet hlogSet = hlogs.headSet(key); - LOG.info("Removing " + hlogSet.size() + - " logs in the list: " + hlogSet); for (String hlog : hlogSet) { this.zkHelper.removeLogFromList(hlog, id); } @@ -638,4 +635,20 @@ public class ReplicationSourceManager { public FileSystem getFs() { return this.fs; } + + /** + * Get a string representation of all the sources' metrics + */ + public String getStats() { + StringBuffer stats = new StringBuffer(); + for (ReplicationSourceInterface source : sources) { + stats.append("Normal source for cluster " + source.getPeerClusterId() + ": "); + stats.append(source.getStats() + "\n"); + } + for (ReplicationSourceInterface oldSource : oldsources) { + stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": "); + stats.append(oldSource.getStats()+ "\n"); + } + return stats.toString(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 3a5baa9..f06e947 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -80,4 +80,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { public String getPeerClusterId() { return peerClusterId; } + + @Override + public String getStats() { + return ""; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java index e7b7068..616d194 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java @@ -104,6 +104,7 @@ public class TestReplication { conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf1.setBoolean("dfs.support.append", true); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf1.setInt("replication.stats.thread.period.seconds", 5); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster();