Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1380113) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -571,7 +572,22 @@ return; } if (!zkHelper.lockOtherRS(rsZnode)) { - return; + try { + while(!zkHelper.lockOtherRS(rsZnode)){ + try { + Thread.sleep(conf.getLong("replication.failover.spin.waittime", 10000)); + } catch (InterruptedException e) { + LOG.debug("InterruptedException happend during checking node:"+rsZnode,e); + continue; + } + if(ZKUtil.checkExists(zkHelper.getZookeeperWatcher(), rsZnode) == -1){ + return; + } + } + } catch (KeeperException e) { + LOG.error("Failed checking node:"+rsZnode+" status.",e); + return; + } } LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); SortedMap> newQueues = Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1380113) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -798,7 +798,8 @@ return false; } String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); - ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(rsServerNameZnode)); + ZKUtil.createEphemeralNodeAndWatch(this.zookeeper, p, + lockToByteArray(rsServerNameZnode)); } catch (KeeperException e) { // This exception will pop up if the znode under which we're trying to // create the lock is already deleted by another region server, meaning @@ -870,6 +871,7 @@ // logging, so just use the already serialized version ZKUtil.createAndWatch(this.zookeeper, child, positionBytes); logQueue.add(hlog); + ZKUtil.deleteNodeRecursively(this.zookeeper, z); } } } catch (KeeperException e) {