Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 168563) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -570,7 +571,20 @@ return; } if (!zkHelper.lockOtherRS(rsZnode)) { - return; + try { + while(!zkHelper.lockOtherRS(rsZnode)){ + Thread.sleep(1000); + if(ZKUtil.checkExists(zkHelper.getZookeeperWatcher(), rsZnode)!= -1){ + return; + } + } + } catch (KeeperException e) { + LOG.error("Failed checking node:"+rsZnode+" status.",e); + return; + }catch (InterruptedException e) { + LOG.error("Failed checking node:"+rsZnode+" status.",e); + return; + } } LOG.info("Moving " + rsZnode + "'s hlogs to my queue"); SortedMap> newQueues = Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 166570) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -645,7 +645,7 @@ return false; } String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE); - ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode)); + ZKUtil.createEphemeralNodeAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode)); } catch (KeeperException e) { // This exception will pop up if the znode under which we're trying to // create the lock is already deleted by another region server, meaning @@ -709,6 +709,7 @@ String child = ZKUtil.joinZNode(newClusterZnode, hlog); ZKUtil.createAndWatch(this.zookeeper, child, position); logQueue.add(hlog); + ZKUtil.deleteNodeRecursively(this.zookeeper, z); } } } catch (KeeperException e) {