Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (revision 1171163) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (working copy) @@ -19,13 +19,16 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.mortbay.log.Log; /** * This class acts as a wrapper for all the objects used to identify and @@ -33,7 +36,7 @@ * of this class as it doesn't encapsulate any specific functionality e.g. * it's a container class. */ -public class ReplicationPeer { +public class ReplicationPeer implements Abortable { private final String clusterKey; private final String id; @@ -52,11 +55,12 @@ * @param zkw zookeeper connection to the peer */ public ReplicationPeer(Configuration conf, String key, - String id, ZooKeeperWatcher zkw) { + String id) throws IOException { this.conf = conf; this.clusterKey = key; this.id = id; - this.zkw = zkw; + zkw = new ZooKeeperWatcher(conf, + "connection to cluster: " + id, this); } /** @@ -116,4 +120,15 @@ public Configuration getConfiguration() { return conf; } + + @Override + public void abort(String why, Throwable e) { + Log.warn(why, e); + } + + public void reloadZkWatcher() throws IOException { + Log.info("Refreshing ZookeeperWatcher"); + zkw = new ZooKeeperWatcher(conf, + "connection to cluster: " + id, this); + } } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1171163) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -52,12 +52,15 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.SessionExpiredException; /** * Class that handles the source of a replication stream. @@ -210,10 +213,23 @@ /** * Select a number of peers at random using the ratio. Mininum 1. */ - private void chooseSinks() throws KeeperException { + private void chooseSinks() { this.currentPeers.clear(); - List addresses = - this.zkHelper.getSlavesAddresses(peerId); + List addresses; + try { + addresses = this.zkHelper.getSlavesAddresses(peerId); + } catch (KeeperException ke) { + LOG.warn("Lost peer's ZooKeeper connection", ke); + if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException) { + ReplicationPeer myPeer = zkHelper.getPeerClusters().get(peerId); + try { + myPeer.reloadZkWatcher(); + } catch(IOException io) { + LOG.info("Creating ZookeeperWatcher failed", io); + } + } + return; + } Set setOfAddr = new HashSet(); int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); LOG.info("Getting " + nbPeers + @@ -434,8 +450,6 @@ Thread.sleep(this.sleepForRetries); } catch (InterruptedException e) { LOG.error("Interrupted while trying to connect to sinks", e); - } catch (KeeperException e) { - LOG.error("Error talking to zookeeper, retrying", e); } } } @@ -625,8 +639,6 @@ } while (!this.stopper.isStopped() && down); } catch (InterruptedException e) { LOG.debug("Interrupted while trying to contact the peer cluster"); - } catch (KeeperException e) { - LOG.error("Error talking to zookeeper, retrying", e); } } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1171163) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -229,13 +229,9 @@ * @return list of region server addresses or an empty list if the slave * is unavailable */ - private List fetchSlavesAddresses(ZooKeeperWatcher zkw) { - try { - return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode); - } catch (KeeperException e) { - LOG.warn("Cannot get peer's region server addresses", e); - return new ArrayList(0); - } + private List fetchSlavesAddresses(ZooKeeperWatcher zkw) + throws KeeperException { + return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode); } /** @@ -318,10 +314,8 @@ return null; } - ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf, - "connection to cluster: " + peerId, this.abortable); return new ReplicationPeer(otherConf, peerId, - otherClusterKey, zkw); + otherClusterKey); } /**