From fcd773570f61dc44328819a82c38c08c5737a85c Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 12 Sep 2014 09:36:37 -0700 Subject: [PATCH] HBASE-11964 Improve replication source thread handling (Lars Hofhansl, Jesse Yates, Andrew Purtell) Improve replication source thread handling. Limit parallelism when transferring queues. Ensure replication sources terminate properly. --- .../regionserver/ReplicationSource.java | 33 +++++++++++++--------- .../regionserver/ReplicationSourceInterface.java | 2 +- .../regionserver/ReplicationSourceManager.java | 10 ++++--- .../hbase/replication/ReplicationSourceDummy.java | 4 +++ .../regionserver/TestReplicationSourceManager.java | 5 ++-- 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 74424e6..41670ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -71,7 +71,7 @@ import com.google.common.util.concurrent.Service; * */ @InterfaceAudience.Private -public class ReplicationSource extends Thread +public class ReplicationSource implements ReplicationSourceInterface { public static final Log LOG = LogFactory.getLog(ReplicationSource.class); @@ -759,20 +759,20 @@ public class ReplicationSource extends Thread return false; } + private Thread myThread = null; + @Override public void startup() { String n = Thread.currentThread().getName(); - Thread.UncaughtExceptionHandler handler = - new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread t, final Throwable e) { - LOG.error("Unexpected exception in ReplicationSource," + - " currentPath=" + currentPath, e); - } - }; + Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(final Thread t, final Throwable e) { + LOG.error("Unexpected exception in ReplicationSource," + " currentPath=" + currentPath, e); + } + }; + myThread = new Thread(this); Threads.setDaemonThreadRunning( - this, n + ".replicationSource," + - this.peerClusterZnode, handler); + myThread, n + ".replicationSource," + + this.peerClusterZnode, handler); } @Override @@ -795,13 +795,18 @@ public class ReplicationSource extends Thread + " because an error occurred: " + reason, cause); } this.running = false; - this.interrupt(); + // Be sure to interrupt the correct thread + if (myThread != null) { + myThread.interrupt(); + } ListenableFuture future = null; if (this.replicationEndpoint != null) { future = this.replicationEndpoint.stop(); } if (join) { - Threads.shutdown(this, this.sleepForRetries); + if (myThread != null) { + Threads.shutdown(myThread, this.sleepForRetries); + } if (future != null) { try { future.get(); @@ -828,7 +833,7 @@ public class ReplicationSource extends Thread } private boolean isActive() { - return !this.stopper.isStopped() && this.running && !isInterrupted(); + return !this.stopper.isStopped() && this.running && !myThread.isInterrupted(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 6388d9b..5489f8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues; * Interface that defines a replication source */ @InterfaceAudience.Private -public interface ReplicationSourceInterface { +public interface ReplicationSourceInterface extends Runnable { /** * Initializer for the source diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index e196588..1f5e06d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -508,7 +508,7 @@ public class ReplicationSourceManager implements ReplicationListener { * Class responsible to setup new ReplicationSources to take care of the * queues from dead region servers. */ - class NodeFailoverWorker extends Thread { + class NodeFailoverWorker implements Runnable { private String rsZnode; private final ReplicationQueues rq; @@ -521,7 +521,6 @@ public class ReplicationSourceManager implements ReplicationListener { */ public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers, final UUID clusterId) { - super("Failover-for-"+rsZnode); this.rsZnode = rsZnode; this.rq = replicationQueues; this.rp = replicationPeers; @@ -559,7 +558,11 @@ public class ReplicationSourceManager implements ReplicationListener { for (Map.Entry> entry : newQueues.entrySet()) { String peerId = entry.getKey(); + SortedSet hlogsSet = entry.getValue(); try { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempting to pick up replication queue for peerId: " + peerId + ", hlogs: " + hlogsSet); + } // there is not an actual peer defined corresponding to peerId for the failover. ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); String actualPeerId = replicationQueueInfo.getPeerId(); @@ -584,11 +587,10 @@ public class ReplicationSourceManager implements ReplicationListener { break; } oldsources.add(src); - SortedSet hlogsSet = entry.getValue(); for (String hlog : hlogsSet) { src.enqueueLog(new Path(oldLogDir, hlog)); } - src.startup(); + src.run(); hlogsByIdRecoveredQueues.put(peerId, hlogsSet); } catch (IOException e) { // TODO manage it diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index f463f76..e22bbb8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -89,4 +89,8 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { public String getStats() { return ""; } + + @Override + public void run() { + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 99ad601..4e215bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -305,8 +305,9 @@ public class TestReplicationSourceManager { NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID( new Long(1), new Long(2))); - w1.start(); - w1.join(5000); + Thread t = new Thread(w1); + t.start(); + t.join(5000); assertEquals(1, manager.getHlogsByIdRecoveredQueues().size()); String id = "1-" + server.getServerName().getServerName(); assertEquals(files, manager.getHlogsByIdRecoveredQueues().get(id)); -- 1.8.5.2 (Apple Git-48)