Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1383787) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -570,7 +570,23 @@ return; } if (!zkHelper.lockOtherRS(rsZnode)) { - return; + try { + while(!zkHelper.lockOtherRS(rsZnode)){ + try { + Thread.sleep(conf.getLong("replication.failover.spin.waittime", 1000)); + } catch (InterruptedException e) { + LOG.debug("InterruptedException happend during checking node:"+rsZnode,e); + return; + } + if(ZKUtil.checkExists(zkHelper.getZookeeperWatcher(), rsZnode) == -1){ + LOG.info("Node "+rsZnode+" has removed by other rs, quit NodeFailoverWorker spin check!"); + return; + } + } + } catch (KeeperException e) { + LOG.error("Failed checking node:"+rsZnode+" status.",e); + return; + } } LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); SortedMap> newQueues = Index: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1381468) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -797,7 +797,8 @@ return false; } String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); - ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(rsServerNameZnode)); + ZKUtil.createNoneExistEphemeralNodeAndWatch(this.zookeeper, p, + lockToByteArray(rsServerNameZnode)); } catch (KeeperException e) { // This exception will pop up if the znode under which we're trying to // create the lock is already deleted by another region server, meaning @@ -869,6 +870,7 @@ // logging, so just use the already serialized version ZKUtil.createAndWatch(this.zookeeper, child, positionBytes); logQueue.add(hlog); + ZKUtil.deleteNodeRecursively(this.zookeeper, z); } } } catch (KeeperException e) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1381468) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -789,6 +790,20 @@ } return true; } + + public static boolean createNoneExistEphemeralNodeAndWatch(ZooKeeperWatcher zkw, + String znode, byte [] data) + throws KeeperException { + try { + waitForZKConnectionIfAuthenticating(zkw); + zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode), + CreateMode.EPHEMERAL); + } catch (InterruptedException e) { + LOG.info("Interrupted", e); + Thread.currentThread().interrupt(); + } + return true; + } /** * Creates the specified znode to be a persistent node carrying the specified