Index: src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeer.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeer.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeer.java (revision 0) @@ -0,0 +1,77 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestReplicationPeer { + + private static final Log LOG = LogFactory.getLog(TestReplicationPeer.class); + private static HBaseTestingUtility utility; + private static Configuration conf; + private static ReplicationPeer rp; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = HBaseConfiguration.create(); + utility = new HBaseTestingUtility(conf); + utility.startMiniCluster(); + + rp = new ReplicationPeer(conf, "clusterKey", "clusterId"); + } + + @Test(timeout=300000) + public void testResetZooKeeperSession() throws Exception { + ZooKeeperWatcher zkw = rp.getZkw(); + + LOG.info("Expiring ReplicationPeer ZooKeeper session."); + utility.expireSession(zkw, null); + + try { + LOG.info("Attempting to use expired ReplicationPeer ZooKeeper session."); + // Trying to use the expired session to assert that it is indeed closed + zkw.getRecoverableZooKeeper().exists("/1/2", false); + } catch (SessionExpiredException k) { + rp.reloadZkWatcher(); + + zkw = rp.getZkw(); + + // Try to use the connection again + LOG.info("Attempting to use refreshed " + + "ReplicationPeer ZooKeeper session."); + zkw.getRecoverableZooKeeper().exists("/1/2", false); + + return; + } + + Assert.fail("ReplicationPeer ZooKeeper session was not properly expired."); + } + +} Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (revision 1177090) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (working copy) @@ -19,11 +19,15 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -33,7 +37,8 @@ * of this class as it doesn't encapsulate any specific functionality e.g. * it's a container class. */ -public class ReplicationPeer { +public class ReplicationPeer implements Abortable { + private static final Log LOG = LogFactory.getLog(ReplicationPeer.class); private final String clusterKey; private final String id; @@ -49,14 +54,13 @@ * @param conf configuration object to this peer * @param key cluster key used to locate the peer * @param id string representation of this peer's identifier - * @param zkw zookeeper connection to the peer */ public ReplicationPeer(Configuration conf, String key, - String id, ZooKeeperWatcher zkw) { + String id) throws IOException { this.conf = conf; this.clusterKey = key; this.id = id; - this.zkw = zkw; + zkw = new ZooKeeperWatcher(conf, "connection to cluster: " + id, this); } /** @@ -116,4 +120,24 @@ public Configuration getConfiguration() { return conf; } + + @Override + public void abort(String why, Throwable e) { + LOG.warn("The ReplicationPeer coresponding to peer " + clusterKey + + " was aborted for the following reason(s):" + why, e); + } + + public void reloadZkWatcher() throws IOException { + LOG.info("Refreshing ZookeeperWatcher for peer " + clusterKey); + zkw.close(); + zkw = new ZooKeeperWatcher(conf, + "connection to cluster: " + id, this); + } + + @Override + public boolean isAborted() { + // Currently the replication peer is never "Aborted", we just log when the + // abort method is called. + return false; + } } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1177090) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -210,10 +210,9 @@ /** * Select a number of peers at random using the ratio. Mininum 1. */ - private void chooseSinks() throws KeeperException { + private void chooseSinks() { this.currentPeers.clear(); - List addresses = - this.zkHelper.getSlavesAddresses(peerId); + List addresses = this.zkHelper.getSlavesAddresses(peerId); Set setOfAddr = new HashSet(); int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); LOG.info("Getting " + nbPeers + @@ -435,8 +434,6 @@ Thread.sleep(this.sleepForRetries); } catch (InterruptedException e) { LOG.error("Interrupted while trying to connect to sinks", e); - } catch (KeeperException e) { - LOG.error("Error talking to zookeeper, retrying", e); } } } @@ -627,10 +624,7 @@ } while (this.isActive() && down ); } catch (InterruptedException e) { LOG.debug("Interrupted while trying to contact the peer cluster"); - } catch (KeeperException e) { - LOG.error("Error talking to zookeeper, retrying", e); } - } } } Index: src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (revision 1177090) +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -43,6 +44,8 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.ConnectionLossException; +import org.apache.zookeeper.KeeperException.SessionExpiredException; /** * This class serves as a helper for all things related to zookeeper @@ -210,8 +213,7 @@ * @param peerClusterId (byte) the cluster to interrogate * @return addresses of all region servers */ - public List getSlavesAddresses(String peerClusterId) - throws KeeperException { + public List getSlavesAddresses(String peerClusterId) { if (this.peerClusters.size() == 0) { return new ArrayList(0); } @@ -219,7 +221,27 @@ if (peer == null) { return new ArrayList(0); } - peer.setRegionServers(fetchSlavesAddresses(peer.getZkw())); + + List addresses; + try { + addresses = fetchSlavesAddresses(peer.getZkw()); + } catch (KeeperException ke) { + if (ke instanceof ConnectionLossException + || ke instanceof SessionExpiredException) { + LOG.warn( + "Lost the ZooKeeper connection for peer " + peer.getClusterKey(), + ke); + try { + peer.reloadZkWatcher(); + } catch(IOException io) { + LOG.warn( + "Creation of ZookeeperWatcher failed for peer " + + peer.getClusterKey(), io); + } + } + addresses = Collections.emptyList(); + } + peer.setRegionServers(addresses); return peer.getRegionServers(); } @@ -229,13 +251,9 @@ * @return list of region server addresses or an empty list if the slave * is unavailable */ - private List fetchSlavesAddresses(ZooKeeperWatcher zkw) { - try { - return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode); - } catch (KeeperException e) { - LOG.warn("Cannot get peer's region server addresses", e); - return new ArrayList(0); - } + private List fetchSlavesAddresses(ZooKeeperWatcher zkw) + throws KeeperException { + return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode); } /** @@ -318,10 +336,8 @@ return null; } - ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf, - "connection to cluster: " + peerId, this.abortable); return new ReplicationPeer(otherConf, peerId, - otherClusterKey, zkw); + otherClusterKey); } /**