Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1353679) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -78,7 +78,7 @@ // The path to the latest log we saw, for new coming sources private Path latestPath; // List of all the other region servers in this cluster - private final List otherRegionServers; + private final List otherRegionServers = new ArrayList(); // Path to the hlogs directories private final Path logDir; // Path to the hlog archive @@ -119,12 +119,9 @@ this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000); this.zkHelper.registerRegionServerListener( new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher())); - List otherRSs = - this.zkHelper.getRegisteredRegionServers(); this.zkHelper.registerRegionServerListener( new PeersWatcher(this.zkHelper.getZookeeperWatcher())); this.zkHelper.listPeersIdsAndWatch(); - this.otherRegionServers = otherRSs == null ? new ArrayList() : otherRSs; // It's preferable to failover 1 RS at a time, but with good zk servers // more could be processed at the same time. int nbWorkers = conf.getInt("replication.executor.workers", 1); @@ -179,6 +176,7 @@ return; } synchronized (otherRegionServers) { + refreshOtherRegionServersList(); LOG.info("Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers); } @@ -398,6 +396,24 @@ } /** + * Reads the list of region servers from ZK and updates the + * local view of it + * @return true if the update was successful, else false + */ + private boolean refreshOtherRegionServersList() { + List newRsList = zkHelper.getRegisteredRegionServers(); + if (newRsList == null) { + return false; + } else { + synchronized (otherRegionServers) { + otherRegionServers.clear(); + otherRegionServers.addAll(newRsList); + } + } + return true; + } + + /** * Watcher used to be notified of the other region server's death * in the local cluster. It initiates the process to transfer the queues * if it is able to grab the lock. @@ -416,7 +432,7 @@ * @param path full path of the new node */ public void nodeCreated(String path) { - refreshRegionServersList(path); + refreshListIfRightPath(path); } /** @@ -427,7 +443,7 @@ if (stopper.isStopped()) { return; } - boolean cont = refreshRegionServersList(path); + boolean cont = refreshListIfRightPath(path); if (!cont) { return; } @@ -443,23 +459,14 @@ if (stopper.isStopped()) { return; } - refreshRegionServersList(path); + refreshListIfRightPath(path); } - private boolean refreshRegionServersList(String path) { + private boolean refreshListIfRightPath(String path) { if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) { return false; } - List newRsList = (zkHelper.getRegisteredRegionServers()); - if (newRsList == null) { - return false; - } else { - synchronized (otherRegionServers) { - otherRegionServers.clear(); - otherRegionServers.addAll(newRsList); - } - } - return true; + return refreshOtherRegionServersList(); } }