diff --git src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index d977830..c3febd9 100644 --- src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationZookeeper.PeerState; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -157,7 +158,7 @@ public class ReplicationPeer implements Abortable { @Override public void abort(String why, Throwable e) { - LOG.fatal("The ReplicationPeer coresponding to peer " + clusterKey + LOG.fatal("The ReplicationPeer corresponding to peer " + clusterKey + " was aborted for the following reason(s):" + why, e); } @@ -167,8 +168,11 @@ public class ReplicationPeer implements Abortable { */ public void reloadZkWatcher() throws IOException { if (zkw != null) zkw.close(); - zkw = new ZooKeeperWatcher(conf, - "connection to cluster: " + id, this); + try { + zkw = new ZooKeeperWatcher(conf, "connection to cluster: " + id, this); + } catch (IOException e) { + new RetryZookeeperWatcher(this).start(); + } } @Override @@ -196,4 +200,53 @@ public class ReplicationPeer implements Abortable { } } } + + public class RetryZookeeperWatcher implements Runnable { + private final ReplicationPeer peer; + private Thread thread; + private boolean running = false; + private final long sleepForRetries = conf.getLong("replication.source.sleepforretries", 1000); + private final long maxRetriesMultiplier = conf.getInt("replication.source.maxretriesmultiplier", 10); + + public RetryZookeeperWatcher(ReplicationPeer peer) { + this.peer = peer; + } + + public synchronized void start() { + if (thread != null) { + return; + } + + thread = new Thread(this); + thread.start(); + } + + @Override + public void run() { + int attempt = 1; + while (true) { + try { + attempt++; + zkw = new ZooKeeperWatcher(conf, "connection to cluster: " + id, peer); + break; + } catch (Exception e) { + LOG.warn("Exception with zookeeper watcher", e); + if (!sleepForRetries("Cannot contact the peer's zk ensemble", attempt)) { + attempt = 1; + } + } + } + thread = null; + } + + private boolean sleepForRetries(String msg, int sleepMultiplier) { + try { + LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier); + Thread.sleep(this.sleepForRetries * sleepMultiplier); + } catch (InterruptedException e) { + LOG.debug("Interrupted while sleeping between retries"); + } + return sleepMultiplier < maxRetriesMultiplier; + } + } } diff --git src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 4c6c897..8858a55 100644 --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -158,7 +158,7 @@ public class ReplicationSourceManager { } synchronized (this.hlogsById) { SortedSet hlogs = this.hlogsById.get(id); - if (!queueRecovered && hlogs.first() != key) { + if (!queueRecovered && !hlogs.first().equals(key)) { SortedSet hlogSet = hlogs.headSet(key); LOG.info("Removing " + hlogSet.size() + " logs in the list: " + hlogSet); diff --git src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 1672d97..d9975d2 100644 --- src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -19,11 +19,8 @@ */ package org.apache.hadoop.hbase.replication; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - import java.io.IOException; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,13 +34,18 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.ZKUtil; +import org.apache.zookeeper.ZooKeeper; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.*; + @Category(LargeTests.class) public class TestMultiSlaveReplication { @@ -68,6 +70,7 @@ public class TestMultiSlaveReplication { private static final byte[] noRepfamName = Bytes.toBytes("norep"); private static HTableDescriptor table; + private static ZooKeeperWatcher watcher; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -89,7 +92,7 @@ public class TestMultiSlaveReplication { utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); - new ZooKeeperWatcher(conf1, "cluster1", null, true); + watcher = new ZooKeeperWatcher(conf1, "cluster1", null, true); conf2 = new Configuration(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); @@ -115,7 +118,7 @@ public class TestMultiSlaveReplication { @Test(timeout=300000) public void testMultiSlaveReplication() throws Exception { - LOG.info("testCyclicReplication"); + LOG.info("testMultiSlaveReplication"); MiniHBaseCluster master = utility1.startMiniCluster(); utility2.startMiniCluster(); utility3.startMiniCluster(); @@ -174,7 +177,7 @@ public class TestMultiSlaveReplication { // Even if the log was rolled in the middle of the replication // "row" is still replication. checkRow(row, 1, htable2); - // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, + // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, // we should wait before checking. checkWithWait(row, 1, htable3); @@ -186,6 +189,62 @@ public class TestMultiSlaveReplication { utility2.shutdownMiniCluster(); utility1.shutdownMiniCluster(); } + + @Test(timeout=300000) + public void testUnknownHostException() throws Exception { + LOG.info("testUnknownHostException"); + MiniHBaseCluster master = utility1.startMiniCluster(); + utility2.startMiniCluster(); + utility3.startMiniCluster(); + ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + + new HBaseAdmin(conf1).createTable(table); + new HBaseAdmin(conf2).createTable(table); + new HBaseAdmin(conf3).createTable(table); + + HTable htable1 = new HTable(conf1, tableName); + htable1.setWriteBufferSize(1024); + HTable htable2 = new HTable(conf2, tableName); + htable2.setWriteBufferSize(1024); + HTable htable3 = new HTable(conf3, tableName); + htable3.setWriteBufferSize(1024); + + admin1.addPeer("1", utility2.getClusterKey()); + + // put "row" and wait 'til it got around, then delete + putAndWait(row, famName, htable1, htable2); + deleteAndWait(row, htable1, htable2); + // check it wasn't replication to cluster 3 + checkRow(row,0,htable3); + + putAndWait(row2, famName, htable1, htable2); + + // now roll the region server's logs + new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0).getServerName().toString()); + // after the log was rolled put a new row + putAndWait(row3, famName, htable1, htable2); + + admin1.addPeer("2", "some.unknown.host:2181:2"); + + // put a row, check it was replicated to all clusters + putAndWait(row1, famName, htable1, htable2); + + // delete and verify + deleteAndWait(row1, htable1, htable2); + + + // now roll the logs again + new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0).getServerName().toString()); + + // verify even though we can't reach peer 2, that we still keep the logs + ZooKeeper zooKeeper = watcher.getRecoverableZooKeeper().getZooKeeper(); + List children = zooKeeper.getChildren("/1/replication/rs/" + master.getRegionServer(0).getServerName().toString() + "/2", false); + assertTrue(children.size() > 0); + + utility3.shutdownMiniCluster(); + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } private void checkWithWait(byte[] row, int count, HTable table) throws Exception { Get get = new Get(row); diff --git src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java index 553b5cd..b7e60e0 100644 --- src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java +++ src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.catalog.CatalogTracker; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; @@ -75,6 +77,26 @@ public class TestReplicationZookeeper { assertEquals(0, repZk.getSlavesAddresses("1").size()); } + @Test + public void testAddingAnInvalidPeerQueuesLogs() throws Exception { + addPeerAndEnable(1); + repZk.connectToPeer("1"); + assertEquals(1, repZk.getPeerClusters().keySet().size()); + } + + private static void addPeerAndEnable(final int peer) throws KeeperException { + ZKUtil.createWithParents(zkw, "/hbase/replication"); + ZKUtil.createWithParents(zkw, "/hbase/replication/peers/" + peer); + ZKUtil.setData(zkw, "/hbase/replication/peers/" + peer, + Bytes.toBytes("someZookeeperAddress" + ":" + + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/" + peer)); + ZKUtil.createWithParents(zkw, "/hbase/replication/peers/" + peer + "/peer-state"); + ZKUtil.setData(zkw, "/hbase/replication/peers/" + peer + "/peer-state", + Bytes.toBytes(ReplicationZookeeper.PeerState.ENABLED.name())); + ZKUtil.createWithParents(zkw, "/hbase/replication/state"); + ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true")); + } + static class DummyServer implements Server { @Override