Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1084414) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -27,8 +27,13 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -78,6 +83,8 @@ private final Path oldLogDir; // The number of ms that we wait before moving znodes, HBASE-3596 private final long sleepBeforeFailover; + // Homemade executer service for replication + private final ThreadPoolExecutor executor; /** * Creates a replication manager and sets the watch on all the other @@ -116,6 +123,17 @@ new PeersWatcher(this.zkHelper.getZookeeperWatcher())); this.zkHelper.listPeersIdsAndWatch(); this.otherRegionServers = otherRSs == null ? new ArrayList() : otherRSs; + // It's preferable to failover 1 RS at a time, but with good zk servers + // more could be processed at the same time. + int nbWorkers = conf.getInt("replication.executor.workers", 1); + // use a short 100ms sleep since this could be done inline with a RS startup + // even if we fail, other region servers can take care of it + this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, + 100, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); + tfb.setNameFormat("ReplicationExecutor-%d"); + this.executor.setThreadFactory(tfb.build()); } /** @@ -199,6 +217,7 @@ * Terminate the replication on this region server */ public void join() { + this.executor.shutdown(); if (this.sources.size() == 0) { this.zkHelper.deleteOwnRSZNode(); } @@ -298,51 +317,13 @@ * @param rsZnode */ public void transferQueues(String rsZnode) { - // Wait a bit before transferring the queues, we may be shutting down. - // This sleep may not be enough in some cases. + NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode); try { - Thread.sleep(this.sleepBeforeFailover); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting before transferring a queue."); - Thread.currentThread().interrupt(); + this.executor.execute(transfer); + } catch (RejectedExecutionException ex) { + LOG.info("Cancelling the transfer of " + rsZnode + + " because of " + ex.getMessage()); } - // We try to lock that rs' queue directory - if (this.stopper.isStopped()) { - LOG.info("Not transferring queue since we are shutting down"); - return; - } - if (!this.zkHelper.lockOtherRS(rsZnode)) { - return; - } - LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); - SortedMap> newQueues = - this.zkHelper.copyQueuesFromRS(rsZnode); - this.zkHelper.deleteRsQueues(rsZnode); - if (newQueues == null || newQueues.size() == 0) { - return; - } - - for (Map.Entry> entry : newQueues.entrySet()) { - String peerId = entry.getKey(); - try { - ReplicationSourceInterface src = getReplicationSource(this.conf, - this.fs, this, this.stopper, this.replicating, peerId); - if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) { - src.terminate("Recovered queue doesn't belong to any current peer"); - break; - } - this.oldsources.add(src); - for (String hlog : entry.getValue()) { - src.enqueueLog(new Path(this.oldLogDir, hlog)); - } - // TODO set it to what's in ZK - src.setSourceEnabled(true); - src.startup(); - } catch (IOException e) { - // TODO manage it - LOG.error("Failed creating a source", e); - } - } } /** @@ -525,6 +506,73 @@ } /** + * Class responsible to setup new ReplicationSources to take care of the + * queues from dead region servers. + */ + class NodeFailoverWorker extends Thread { + + private String rsZnode; + + /** + * + * @param rsZnode + */ + public NodeFailoverWorker(String rsZnode) { + super("Failover-for-"+rsZnode); + this.rsZnode = rsZnode; + } + + @Override + public void run() { + // Wait a bit before transferring the queues, we may be shutting down. + // This sleep may not be enough in some cases. + try { + Thread.sleep(sleepBeforeFailover); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting before transferring a queue."); + Thread.currentThread().interrupt(); + } + // We try to lock that rs' queue directory + if (stopper.isStopped()) { + LOG.info("Not transferring queue since we are shutting down"); + return; + } + if (!zkHelper.lockOtherRS(rsZnode)) { + return; + } + LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); + SortedMap> newQueues = + zkHelper.copyQueuesFromRS(rsZnode); + zkHelper.deleteRsQueues(rsZnode); + if (newQueues == null || newQueues.size() == 0) { + return; + } + + for (Map.Entry> entry : newQueues.entrySet()) { + String peerId = entry.getKey(); + try { + ReplicationSourceInterface src = getReplicationSource(conf, + fs, ReplicationSourceManager.this, stopper, replicating, peerId); + if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) { + src.terminate("Recovered queue doesn't belong to any current peer"); + break; + } + oldsources.add(src); + for (String hlog : entry.getValue()) { + src.enqueueLog(new Path(oldLogDir, hlog)); + } + // TODO set it to what's in ZK + src.setSourceEnabled(true); + src.startup(); + } catch (IOException e) { + // TODO manage it + LOG.error("Failed creating a source", e); + } + } + } + } + + /** * Get the directory where hlogs are archived * @return the directory where hlogs are archived */