Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (revision 1171343) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (working copy) @@ -19,11 +19,15 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -33,7 +37,8 @@ * of this class as it doesn't encapsulate any specific functionality e.g. * it's a container class. */ -public class ReplicationPeer { +public class ReplicationPeer implements Abortable { + private static final Log LOG = LogFactory.getLog(ReplicationPeer.class); private final String clusterKey; private final String id; @@ -52,11 +57,12 @@ * @param zkw zookeeper connection to the peer */ public ReplicationPeer(Configuration conf, String key, - String id, ZooKeeperWatcher zkw) { + String id) throws IOException { this.conf = conf; this.clusterKey = key; this.id = id; - this.zkw = zkw; + zkw = new ZooKeeperWatcher(conf, + "connection to cluster: " + id, this); } /** @@ -116,4 +122,16 @@ public Configuration getConfiguration() { return conf; } + + @Override + public void abort(String why, Throwable e) { + LOG.warn(why, e); + } + + public void reloadZkWatcher() throws IOException { + LOG.info("Refreshing ZookeeperWatcher for peer " + clusterKey); + zkw.close(); + zkw = new ZooKeeperWatcher(conf, + "connection to cluster: " + id, this); + } } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1171343) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -210,10 +210,9 @@ /** * Select a number of peers at random using the ratio. Mininum 1. */ - private void chooseSinks() throws KeeperException { + private void chooseSinks() { this.currentPeers.clear(); - List addresses = - this.zkHelper.getSlavesAddresses(peerId); + List addresses = this.zkHelper.getSlavesAddresses(peerId); Set setOfAddr = new HashSet(); int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); LOG.info("Getting " + nbPeers + @@ -435,8 +434,6 @@ Thread.sleep(this.sleepForRetries); } catch (InterruptedException e) { LOG.error("Interrupted while trying to connect to sinks", e); - } catch (KeeperException e) { - LOG.error("Error talking to zookeeper, retrying", e); } } } @@ -627,10 +624,7 @@ } while (this.isActive() && down ); } catch (InterruptedException e) { LOG.debug("Interrupted while trying to contact the peer cluster"); - } catch (KeeperException e) { - LOG.error("Error talking to zookeeper, retrying", e); } - } } } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1171343) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -43,6 +43,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.SessionExpiredException; /** * This class serves as a helper for all things related to zookeeper @@ -210,8 +212,7 @@ * @param peerClusterId (byte) the cluster to interrogate * @return addresses of all region servers */ - public List getSlavesAddresses(String peerClusterId) - throws KeeperException { + public List getSlavesAddresses(String peerClusterId) { if (this.peerClusters.size() == 0) { return new ArrayList(0); } @@ -219,7 +220,29 @@ if (peer == null) { return new ArrayList(0); } - peer.setRegionServers(fetchSlavesAddresses(peer.getZkw())); + + List addresses; + try { + addresses = fetchSlavesAddresses(peer.getZkw()); + } catch (KeeperException ke) { + if (ke instanceof ConnectionLossException + || ke instanceof SessionExpiredException) { + LOG.warn( + "Lost the ZooKeeper connection for peer " + peer.getClusterKey(), + ke); + try { + peer.reloadZkWatcher(); + } catch(IOException io) { + LOG.warn( + "Creation of ZookeeperWatcher failed for peer " + + peer.getClusterKey(), io); + } + } + addresses = new ArrayList(0); + peer.setRegionServers(addresses); + return addresses; + } + peer.setRegionServers(addresses); return peer.getRegionServers(); } @@ -229,13 +252,9 @@ * @return list of region server addresses or an empty list if the slave * is unavailable */ - private List fetchSlavesAddresses(ZooKeeperWatcher zkw) { - try { - return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode); - } catch (KeeperException e) { - LOG.warn("Cannot get peer's region server addresses", e); - return new ArrayList(0); - } + private List fetchSlavesAddresses(ZooKeeperWatcher zkw) + throws KeeperException { + return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode); } /** @@ -318,10 +337,8 @@ return null; } - ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf, - "connection to cluster: " + peerId, this.abortable); return new ReplicationPeer(otherConf, peerId, - otherClusterKey, zkw); + otherClusterKey); } /**