From 37e2dd420b091c3f9c2aa82fdca4f5d6fb10f4a3 Mon Sep 17 00:00:00 2001 From: Pankaj_Kumar Date: Mon, 9 Nov 2015 11:33:34 +0800 Subject: [PATCH] HBASE-14498, Master stuck in infinite loop when all Zookeeper servers are unreachable. Change-Id: I3d5604a38894e62e2862f2ee7aa0dec987a2ea87 --- .../hadoop/hbase/zookeeper/ZooKeeperWatcher.java | 39 +++++++++++++++++- .../hbase/zookeeper/TestZooKeeperWatcher.java | 46 +++++++++++++++++++++- 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index f7a2175..25bc247 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,7 +75,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private RecoverableZooKeeper recoverableZooKeeper; // abortable in case of zk failure - protected Abortable abortable; + protected final Abortable abortable; // Used if abortable is null private boolean aborted = false; @@ -85,6 +86,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL // negotiation to complete public CountDownLatch saslLatch = new CountDownLatch(1); + + // Connection timeout on disconnect event + private long connWaitTimeOut; + private AtomicBoolean isConnected = new AtomicBoolean(false); // node names @@ -167,6 +172,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { this.identifier = identifier + "0x0"; this.abortable = abortable; setNodeNames(conf); + // On Disconnected event a thread will wait for sometime (2/3 of zookeeper.session.timeout), + // it will abort the process if no SyncConnected event reported by the time. + connWaitTimeOut = this.conf.getLong("zookeeper.session.timeout", 90000) * 2 / 3; this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier); if (canCreateBaseZNode) { createBaseZNodes(); @@ -596,6 +604,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { private void connectionEvent(WatchedEvent event) { switch(event.getState()) { case SyncConnected: + isConnected.set(true); // Now, this callback can be invoked before the this.zookeeper is set. // Wait a little while. long finished = System.currentTimeMillis() + @@ -625,7 +634,33 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { // Abort the server if Disconnected or Expired case Disconnected: - LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring")); + LOG.debug("Received Disconnected from ZooKeeper."); + isConnected.set(false); + + Thread t = new Thread() { + public void run() { + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < connWaitTimeOut) { + if (isConnected.get()) { + LOG.debug("Client got reconnected to zookeeper."); + return; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + break; + } + } + + if (!isConnected.get() && abortable != null) { + String msg = + prefix("Couldn't connect to ZooKeeper after waiting " + connWaitTimeOut + + " ms, aborting"); + abortable.abort(msg, new KeeperException.ConnectionLossException()); + } + }; + }; + t.start(); break; case Expired: diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java index 10a3816..fd68f00 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperWatcher.java @@ -22,15 +22,22 @@ import static org.junit.Assert.*; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.junit.Test; import org.junit.experimental.categories.Category; @Category({SmallTests.class}) public class TestZooKeeperWatcher { - + private final static Log LOG = LogFactory.getLog(TestZooKeeperWatcher.class); + @Test public void testIsClientReadable() throws ZooKeeperConnectionException, IOException { ZooKeeperWatcher watcher = new ZooKeeperWatcher(HBaseConfiguration.create(), @@ -56,5 +63,42 @@ public class TestZooKeeperWatcher { watcher.close(); } + + @Test + public void testConnectionEvent() throws ZooKeeperConnectionException, IOException { + long zkSessionTimeout = 15000l; + Configuration conf = HBaseConfiguration.create(); + conf.set("zookeeper.session.timeout", "15000"); + + Abortable abortable = new Abortable() { + boolean aborted = false; + + @Override + public void abort(String why, Throwable e) { + aborted = true; + LOG.error(why, e); + } + + @Override + public boolean isAborted() { + return aborted; + } + }; + ZooKeeperWatcher watcher = new ZooKeeperWatcher(conf, "testConnectionEvent", abortable, false); + WatchedEvent event = + new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null); + + long startTime = System.currentTimeMillis(); + while (!abortable.isAborted() && (System.currentTimeMillis() - startTime < zkSessionTimeout)) { + watcher.process(event); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + + assertTrue(abortable.isAborted()); + watcher.close(); + } } -- 2.6.1.windows.1