Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1363570) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -185,8 +185,7 @@ this.metrics = new ReplicationSourceMetrics(peerClusterZnode); try { - this.clusterId = UUID.fromString(ClusterId.readClusterIdZNode(zkHelper - .getZookeeperWatcher())); + this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher()); } catch (KeeperException ke) { throw new IOException("Could not read cluster id", ke); } @@ -245,13 +244,19 @@ if (!this.isActive()) { return; } + int sleepMultiplier = 1; // delay this until we are in an asynchronous thread - try { - this.peerClusterId = UUID.fromString(ClusterId - .readClusterIdZNode(zkHelper.getPeerClusters().get(peerId).getZkw())); - } catch (KeeperException ke) { - this.terminate("Could not read peer's cluster id", ke); + while (this.peerClusterId == null) { + this.peerClusterId = zkHelper.getPeerUUID(this.peerId); + if (this.peerClusterId == null) { + if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { + sleepMultiplier++; + } + } } + // resetting to 1 to reuse later + sleepMultiplier = 1; + LOG.info("Replicating "+clusterId + " -> " + peerClusterId); // If this is recovered, the queue is already full and the first log @@ -265,7 +270,6 @@ peerClusterZnode, e); } } - int sleepMultiplier = 1; // Loop until we close down while (isActive()) { // Sleep until replication is enabled again Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1363563) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -29,6 +29,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -38,7 +39,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; @@ -238,19 +241,7 @@ try { addresses = fetchSlavesAddresses(peer.getZkw()); } catch (KeeperException ke) { - if (ke instanceof ConnectionLossException - || ke instanceof SessionExpiredException) { - LOG.warn( - "Lost the ZooKeeper connection for peer " + peer.getClusterKey(), - ke); - try { - peer.reloadZkWatcher(); - } catch(IOException io) { - LOG.warn( - "Creation of ZookeeperWatcher failed for peer " - + peer.getClusterKey(), io); - } - } + reconnectPeer(ke, peer); addresses = Collections.emptyList(); } peer.setRegionServers(addresses); @@ -798,6 +789,50 @@ return data == null || data.length() == 0 ? 0 : Long.parseLong(data); } + /** + * Returns the UUID of the provided peer id. Should a connection loss or session + * expiration happen, the ZK handler will be reopened once and if it still doesn't + * work then it will bail and return null. + * @param peerId the peer's ID that will be converted into a UUID + * @return a UUID or null if there's a ZK connection issue + */ + public UUID getPeerUUID(String peerId) { + ReplicationPeer peer = getPeerClusters().get(peerId); + UUID peerUUID = null; + try { + peerUUID = getUUIDForCluster(peer.getZkw()); + } catch (KeeperException ke) { + reconnectPeer(ke, peer); + } + return peerUUID; + } + + /** + * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions + * @param zkw watcher connected to an ensemble + * @return the UUID read from zookeeper + * @throws KeeperException + */ + public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException { + return UUID.fromString(ClusterId.readClusterIdZNode(zkw)); + } + + private void reconnectPeer(KeeperException ke, ReplicationPeer peer) { + if (ke instanceof ConnectionLossException + || ke instanceof SessionExpiredException) { + LOG.warn( + "Lost the ZooKeeper connection for peer " + peer.getClusterKey(), + ke); + try { + peer.reloadZkWatcher(); + } catch(IOException io) { + LOG.warn( + "Creation of ZookeeperWatcher failed for peer " + + peer.getClusterKey(), io); + } + } + } + public void registerRegionServerListener(ZooKeeperListener listener) { this.zookeeper.registerListener(listener); }