diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index 9df71e3..373be6e 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -93,6 +93,10 @@ com.google.protobuf protobuf-java + + com.netflix.curator + curator-recipes + org.apache.zookeeper zookeeper diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 89da357..715e478 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Random; +import com.netflix.curator.framework.CuratorFramework; /** * A zookeeper that can handle 'recoverable' errors. @@ -74,8 +75,7 @@ import java.util.Random; @InterfaceStability.Evolving public class RecoverableZooKeeper { private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class); - // the actual ZooKeeper client instance - volatile private ZooKeeper zk; + private volatile ZooKeeperManager zkManager; private final RetryCounterFactory retryCounterFactory; // An identifier of this process in the cluster private final String identifier; @@ -99,16 +99,18 @@ public class RecoverableZooKeeper { private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT; public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis) + ZooKeeperWatcher watcher, int maxRetries, int retryIntervalMillis) throws IOException { this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, null); } public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier) + ZooKeeperWatcher watcher, int maxRetries, int retryIntervalMillis, String identifier) throws IOException { - this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); + this.zkManager = new ZooKeeperManager(watcher, quorumServers, sessionTimeout, + maxRetries, retryIntervalMillis, false); + this.zkManager.connect(); this.retryCounterFactory = new RetryCounterFactory(maxRetries, retryIntervalMillis); @@ -130,12 +132,14 @@ public class RecoverableZooKeeper { public void reconnectAfterExpiration() throws IOException, InterruptedException { LOG.info("Closing dead ZooKeeper connection, session" + - " was: 0x"+Long.toHexString(zk.getSessionId())); - zk.close(); - this.zk = new ZooKeeper(this.quorumServers, - this.sessionTimeout, this.watcher); + " was: 0x"+Long.toHexString(getZk().getSessionId())); + zkManager.reconnect(); LOG.info("Recreated a ZooKeeper, session" + - " is: 0x"+Long.toHexString(zk.getSessionId())); + " is: 0x"+Long.toHexString(getZk().getSessionId())); + } + + private ZooKeeper getZk() { + return zkManager.getZooKeeper(); } /** @@ -149,7 +153,7 @@ public class RecoverableZooKeeper { boolean isRetry = false; // False for first attempt, true for all retries. while (true) { try { - zk.delete(path, version); + getZk().delete(path, version); return; } catch (KeeperException e) { switch (e.code()) { @@ -187,7 +191,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - return zk.exists(path, watcher); + return getZk().exists(path, watcher); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -214,7 +218,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - return zk.exists(path, watch); + return getZk().exists(path, watch); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -251,7 +255,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - return zk.getChildren(path, watcher); + return getZk().getChildren(path, watcher); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -278,7 +282,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - return zk.getChildren(path, watch); + return getZk().getChildren(path, watch); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -305,7 +309,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - byte[] revData = zk.getData(path, watcher, stat); + byte[] revData = getZk().getData(path, watcher, stat); return this.removeMetaData(revData); } catch (KeeperException e) { switch (e.code()) { @@ -333,7 +337,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - byte[] revData = zk.getData(path, watch, stat); + byte[] revData = getZk().getData(path, watch, stat); return this.removeMetaData(revData); } catch (KeeperException e) { switch (e.code()) { @@ -365,7 +369,7 @@ public class RecoverableZooKeeper { boolean isRetry = false; while (true) { try { - return zk.setData(path, newData, version); + return getZk().setData(path, newData, version); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -378,7 +382,7 @@ public class RecoverableZooKeeper { // try to verify whether the previous setData success or not try{ Stat stat = new Stat(); - byte[] revData = zk.getData(path, false, stat); + byte[] revData = getZk().getData(path, false, stat); if(Bytes.compareTo(revData, newData) == 0) { // the bad version is caused by previous successful setData return stat; @@ -400,8 +404,8 @@ public class RecoverableZooKeeper { } /** - *

* NONSEQUENTIAL create is idempotent operation. + *

* Retry before throwing exceptions. * But this function will not throw the NodeExist exception back to the * application. @@ -410,8 +414,6 @@ public class RecoverableZooKeeper { * But SEQUENTIAL is NOT idempotent operation. It is necessary to add * identifier to the path to verify, whether the previous one is successful * or not. - *

- * * @return Path */ public String create(String path, byte[] data, List acl, @@ -439,7 +441,7 @@ public class RecoverableZooKeeper { boolean isRetry = false; // False for first attempt, true for all retries. while (true) { try { - return zk.create(path, data, acl, createMode); + return getZk().create(path, data, acl, createMode); } catch (KeeperException e) { switch (e.code()) { case NODEEXISTS: @@ -447,7 +449,7 @@ public class RecoverableZooKeeper { // If the connection was lost, there is still a possibility that // we have successfully created the node at our previous attempt, // so we read the node and compare. - byte[] currentData = zk.getData(path, false, null); + byte[] currentData = getZk().getData(path, false, null); if (currentData != null && Bytes.compareTo(currentData, data) == 0) { // We successfully created a non-sequential node @@ -494,7 +496,7 @@ public class RecoverableZooKeeper { } } first = false; - return zk.create(newPath, data, acl, createMode); + return getZk().create(newPath, data, acl, createMode); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -548,7 +550,7 @@ public class RecoverableZooKeeper { Iterable multiOps = prepareZKMulti(ops); while (true) { try { - return zk.multi(multiOps); + return getZk().multi(multiOps); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -573,11 +575,11 @@ public class RecoverableZooKeeper { String parent = path.substring(0, lastSlashIdx); String nodePrefix = path.substring(lastSlashIdx+1); - List nodes = zk.getChildren(parent, false); + List nodes = getZk().getChildren(parent, false); List matching = filterByPrefix(nodes, nodePrefix); for (String node : matching) { String nodePath = parent + "/" + node; - Stat stat = zk.exists(nodePath, false); + Stat stat = getZk().exists(nodePath, false); if (stat != null) { return nodePath; } @@ -621,27 +623,36 @@ public class RecoverableZooKeeper { } public long getSessionId() { - return zk.getSessionId(); + return getZk().getSessionId(); } public void close() throws InterruptedException { - zk.close(); + zkManager.close(); } public States getState() { - return zk.getState(); + return getZk().getState(); } public ZooKeeper getZooKeeper() { - return zk; + return getZk(); } public byte[] getSessionPasswd() { - return zk.getSessionPasswd(); + return getZk().getSessionPasswd(); } public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) { - this.zk.sync(path, null, null); + this.getZk().sync(path, null, null); + } + + /** + * Returns the managed CuratorFramework instance. This function ensures that + * before returning the curator instance is initialized + * @return CuratorFramework instance + */ + public CuratorFramework getCuratorClient() { + return zkManager.getCuratorClient(); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index de3eedd..c08e148 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -103,7 +103,7 @@ public class ZKUtil { * @return connection to zookeeper * @throws IOException if unable to connect to zk or config problem */ - public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) + public static RecoverableZooKeeper connect(Configuration conf, ZooKeeperWatcher watcher) throws IOException { Properties properties = ZKConfig.makeZKProps(conf); String ensemble = ZKConfig.getZKQuorumServersString(properties); @@ -111,13 +111,13 @@ public class ZKUtil { } public static RecoverableZooKeeper connect(Configuration conf, String ensemble, - Watcher watcher) + ZooKeeperWatcher watcher) throws IOException { return connect(conf, ensemble, watcher, null); } public static RecoverableZooKeeper connect(Configuration conf, String ensemble, - Watcher watcher, final String identifier) + ZooKeeperWatcher watcher, final String identifier) throws IOException { if(ensemble == null) { throw new IOException("Unable to determine ZooKeeper ensemble"); @@ -1172,7 +1172,7 @@ public class ZKUtil { createAndFailSilent(zkw, (CreateAndFailSilent)ZKUtilOp.createAndFailSilent(znode, data)); } - + private static void createAndFailSilent(ZooKeeperWatcher zkw, CreateAndFailSilent cafs) throws KeeperException { CreateRequest create = (CreateRequest)toZooKeeperOp(zkw, cafs).toRequestRecord(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java new file mode 100644 index 0000000..7e5f1ae --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +import com.netflix.curator.ensemble.EnsembleProvider; +import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.framework.CuratorFrameworkFactory; +import com.netflix.curator.retry.ExponentialBackoffRetry; +import com.netflix.curator.utils.ZookeeperFactory; + +/** + * Manages the ZooKeeper connection. + */ +@InterfaceAudience.Private +class ZooKeeperManager { + private static final Log LOG = LogFactory.getLog(ZooKeeperManager.class); + + private volatile CuratorFramework curatorClient; + private ManagedZooKeeperFactory zkFactory; + private Watcher zkWatcher; + + private String connectString; + private int sessionTimeout; + private int maxRetries; + private int retryIntervalMillis; + private boolean canBeReadOnly; + + /** + * Contains a set of watchers and forwards the events to all. + */ + static class WatcherSet implements Watcher { + private Watcher[] watchers; + + public WatcherSet(Watcher zkWatcher, Watcher curatorWatcher) { + this.watchers = new Watcher[] {zkWatcher, curatorWatcher}; + } + + @Override + public void process(WatchedEvent event) { + for (Watcher watcher: watchers) { + LOG.debug("WatcherSet: sending event:" + event + " to watcher: " + watcher); + watcher.process(event); + } + } + + Watcher getCuratorWatcher() { + return watchers[1]; + } + } + + static class ManagedZooKeeper extends ZooKeeper { + public ManagedZooKeeper(String connectString, int sessionTimeout, + Watcher watcher, boolean canBeReadOnly) throws IOException { + super(connectString, sessionTimeout, watcher, canBeReadOnly); + } + @Override + public synchronized void close() throws InterruptedException { + //close is no-op since we do not want curator to close the connection + } + public void shutDown() throws InterruptedException { + super.close(); + } + } + + /** + * In HBase, we want to manage our own ZooKeeper connection, and we do not want curator + * to recreate the connection. This class bridges bridges curator and our zk-management. + */ + static class ManagedZooKeeperFactory implements ZookeeperFactory { + class ManagedEnsembleProvider implements EnsembleProvider { + @Override + public void start() throws Exception { + } + @Override + public String getConnectionString() { + return String.valueOf(zk.getSessionId()); + } + @Override + public void close() throws IOException { + } + } + + ZooKeeperWatcher zkWatcher; + ManagedZooKeeper zk = null; + ManagedZooKeeper lastZk = null; + ManagedEnsembleProvider ensembleProvider; + WatcherSet doubleWatcher; + + ManagedZooKeeperFactory(ZooKeeperWatcher zkWatcher) { + this.zkWatcher = zkWatcher; + this.ensembleProvider = new ManagedEnsembleProvider(); + } + @Override + public synchronized ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher curatorWatcher, + boolean canBeReadOnly) throws Exception { + LOG.debug("Curator requested newZookeeper()"); + if (doubleWatcher != null) { + return zk; + } + // Wait for the new zookeeper to be initialized from HBase + while (this.zk == lastZk || this.zk == null) { + this.wait(); + } + this.doubleWatcher = new WatcherSet(zkWatcher, curatorWatcher); + this.zk.register(doubleWatcher); + + // HACK: send the last connection event to curator so that it initializes + // it's state + WatchedEvent lastEvent = zkWatcher.getLastConnectionEvent(); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending last zookeeper event to curator: " + lastEvent); + } + if (lastEvent != null) { + curatorWatcher.process(lastEvent); + } + + return this.zk; + } + + public synchronized void close() throws InterruptedException { + if (this.zk != null) { + this.zk.shutDown(); + this.zk = null; + } + } + + public synchronized void connect(String connectString, int sessionTimeout, Watcher watcher, + boolean canBeReadOnly) throws IOException { + synchronized (this) { + Watcher w = watcher; + if (this.doubleWatcher != null) { + //assume the watcher from Curator does not change while the CuratorFramework is alive + w = this.doubleWatcher = new WatcherSet(watcher, doubleWatcher.getCuratorWatcher()); + } + this.lastZk = zk; + this.zk = new ManagedZooKeeper(connectString, sessionTimeout, w, canBeReadOnly); + this.notify(); + } + } + } + + public ZooKeeperManager(ZooKeeperWatcher zkWatcher, String connectString, int sessionTimeout, + int maxRetries, int retryIntervalMillis, boolean canBeReadOnly) { + this.zkWatcher = zkWatcher; + this.connectString = connectString; + this.sessionTimeout = sessionTimeout; + this.maxRetries = maxRetries; + this.retryIntervalMillis = retryIntervalMillis; + this.canBeReadOnly = canBeReadOnly; + this.zkFactory = new ManagedZooKeeperFactory(zkWatcher); + } + + /** + * Initializes the curator framework. We call this only when we need it, so that + * curator is not initialized from the client side. + */ + private synchronized void initializeCurator() { + if (this.curatorClient == null) { + this.curatorClient = CuratorFrameworkFactory.builder() + .sessionTimeoutMs(sessionTimeout) + .zookeeperFactory(zkFactory) + .ensembleProvider(zkFactory.ensembleProvider) + .retryPolicy(new ExponentialBackoffRetry(retryIntervalMillis, maxRetries)) + .canBeReadOnly(canBeReadOnly) + .build(); + this.curatorClient.start(); + } + } + + public void connect() throws IOException { + this.zkFactory.connect(connectString, sessionTimeout, zkWatcher, canBeReadOnly); + + } + + public void reconnect() throws IOException, InterruptedException { + zkFactory.close(); //we don't want to close the CuratorClient + connect(); + } + + /** + * Returns the managed CuratorFramework instance. This function ensures that + * before returning the curator instance is initialized + * @return CuratorFramework instance + */ + public CuratorFramework getCuratorClient() { + if (curatorClient == null) { + //note: since curatorClient is volatile, this double checked locking + // is not broken under JDK5+ (http://en.wikipedia.org/wiki/Double-checked_locking) + initializeCurator(); + } + return this.curatorClient; + } + + /** + * Returns the underlying ZooKeeper instance + * @return ZooKeeper object + */ + public ZooKeeper getZooKeeper() { + return this.zkFactory.zk; + } + + public void close() throws InterruptedException { + if (this.curatorClient != null) { + this.curatorClient.close(); + } + zkFactory.close(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index d10dd7c..c0a8469 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -110,6 +110,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { // znode containing the state of recovering regions public String recoveringRegionsZNode; + //Keep around the last even about the connection + private WatchedEvent lastConnectionEvent; + // Certain ZooKeeper nodes need to be world-readable public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE = new ArrayList() { { @@ -362,6 +365,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { * @param event */ private void connectionEvent(WatchedEvent event) { + this.lastConnectionEvent = event; switch(event.getState()) { case SyncConnected: // Now, this callback can be invoked before the this.zookeeper is set. @@ -495,4 +499,11 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { return this.masterAddressZNode; } + /** + * Returns the last connection event that has been received. + * @return a WatchedEvent that has been received + */ + public WatchedEvent getLastConnectionEvent() { + return lastConnectionEvent; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index ee8f34b..ad83067 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.ClockOutOfSyncException; -import org.apache.hadoop.hbase.exceptions.PleaseHoldException; import org.apache.hadoop.hbase.exceptions.YouAreDeadException; import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException; import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java index 99a214d..2452fb1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java @@ -18,14 +18,22 @@ package org.apache.hadoop.hbase.zookeeper; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -58,7 +66,7 @@ public class TestZKLeaderManager { } private static class MockLeader extends Thread implements Stoppable { - private boolean stopped; + private volatile boolean stopped; private ZooKeeperWatcher watcher; private ZKLeaderManager zkLeader; private AtomicBoolean master = new AtomicBoolean(false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java new file mode 100644 index 0000000..efa9ecf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.lang.reflect.Field; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.framework.imps.CuratorFrameworkState; + +@Category(MediumTests.class) +public class TestZooKeeperManager { + private static final Log LOG = LogFactory.getLog(TestZooKeeperManager.class); + + HBaseTestingUtility util; + + @Before + public void setUp() throws Exception { + util = new HBaseTestingUtility(); + util.startMiniZKCluster(1); + } + + @After + public void tearDown() throws IOException { + util.shutdownMiniZKCluster(); + } + + /** Returns the value of a private field */ + @SuppressWarnings("unchecked") + T getField(O obj, String fieldName) throws SecurityException, NoSuchFieldException, + IllegalArgumentException, IllegalAccessException { + Class clazz = obj.getClass(); + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(obj); + } + + @Test + public void testCuratorIsNotInitializedIfNotUsed() throws Exception { + ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher(); + //do a simple operation + zkWatcher.getRecoverableZooKeeper().exists(zkWatcher.baseZNode, false); + + RecoverableZooKeeper zk = zkWatcher.getRecoverableZooKeeper(); + + ZooKeeperManager manager = getField(zk, "zkManager"); + CuratorFramework curatorClient = getField(manager, "curatorClient"); + Assert.assertNull(curatorClient); + } + + @Test(timeout=20000) + public void testCuratorInitializationRightAfterZkWatcher() throws Exception { + ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher(); + CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient(); + Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState()); + Assert.assertNotNull(curatorClient.checkExists().forPath("/")); + } + + @Test(timeout=20000) + public void testCuratorInitializationThenSessionExpiration() throws Exception { + ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher(); + Assert.assertNotNull(zkWatcher.getRecoverableZooKeeper().exists("/", false)); + CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient(); + Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState()); + curatorClient.checkExists().forPath("/"); + + util.expireSession(zkWatcher); + Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState()); + zkWatcher.reconnectAfterExpiration(); + Assert.assertNotNull(curatorClient.checkExists().forPath("/")); + } + + @Test(timeout=20000) + public void testCuratorInitializationAfterSessionExpiration() throws Exception { + ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher(); + Assert.assertNotNull(zkWatcher.getRecoverableZooKeeper().exists("/", false)); + util.expireSession(zkWatcher); + + CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient(); + Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState()); + zkWatcher.reconnectAfterExpiration(); + Assert.assertNotNull(curatorClient.checkExists().forPath("/")); + } +} diff --git a/pom.xml b/pom.xml index e27cc1b..5271303 100644 --- a/pom.xml +++ b/pom.xml @@ -901,6 +901,7 @@ 1.9.0 2.4.1 1.0.1 + 1.2.5 0.9.0 3.4.5 0.0.1-SNAPSHOT @@ -1277,6 +1278,11 @@ ${stax-api.version}
+ com.netflix.curator + curator-recipes + ${curator.version} + + junit junit ${junit.version}