org.apache.zookeeper
zookeeper
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
index 89da357..715e478 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
@@ -46,6 +46,7 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import com.netflix.curator.framework.CuratorFramework;
/**
* A zookeeper that can handle 'recoverable' errors.
@@ -74,8 +75,7 @@ import java.util.Random;
@InterfaceStability.Evolving
public class RecoverableZooKeeper {
private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
- // the actual ZooKeeper client instance
- volatile private ZooKeeper zk;
+ private volatile ZooKeeperManager zkManager;
private final RetryCounterFactory retryCounterFactory;
// An identifier of this process in the cluster
private final String identifier;
@@ -99,16 +99,18 @@ public class RecoverableZooKeeper {
private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
- Watcher watcher, int maxRetries, int retryIntervalMillis)
+ ZooKeeperWatcher watcher, int maxRetries, int retryIntervalMillis)
throws IOException {
this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
null);
}
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
- Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
+ ZooKeeperWatcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
throws IOException {
- this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
+ this.zkManager = new ZooKeeperManager(watcher, quorumServers, sessionTimeout,
+ maxRetries, retryIntervalMillis, false);
+ this.zkManager.connect();
this.retryCounterFactory =
new RetryCounterFactory(maxRetries, retryIntervalMillis);
@@ -130,12 +132,14 @@ public class RecoverableZooKeeper {
public void reconnectAfterExpiration()
throws IOException, InterruptedException {
LOG.info("Closing dead ZooKeeper connection, session" +
- " was: 0x"+Long.toHexString(zk.getSessionId()));
- zk.close();
- this.zk = new ZooKeeper(this.quorumServers,
- this.sessionTimeout, this.watcher);
+ " was: 0x"+Long.toHexString(getZk().getSessionId()));
+ zkManager.reconnect();
LOG.info("Recreated a ZooKeeper, session" +
- " is: 0x"+Long.toHexString(zk.getSessionId()));
+ " is: 0x"+Long.toHexString(getZk().getSessionId()));
+ }
+
+ private ZooKeeper getZk() {
+ return zkManager.getZooKeeper();
}
/**
@@ -149,7 +153,7 @@ public class RecoverableZooKeeper {
boolean isRetry = false; // False for first attempt, true for all retries.
while (true) {
try {
- zk.delete(path, version);
+ getZk().delete(path, version);
return;
} catch (KeeperException e) {
switch (e.code()) {
@@ -187,7 +191,7 @@ public class RecoverableZooKeeper {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- return zk.exists(path, watcher);
+ return getZk().exists(path, watcher);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -214,7 +218,7 @@ public class RecoverableZooKeeper {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- return zk.exists(path, watch);
+ return getZk().exists(path, watch);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -251,7 +255,7 @@ public class RecoverableZooKeeper {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- return zk.getChildren(path, watcher);
+ return getZk().getChildren(path, watcher);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -278,7 +282,7 @@ public class RecoverableZooKeeper {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- return zk.getChildren(path, watch);
+ return getZk().getChildren(path, watch);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -305,7 +309,7 @@ public class RecoverableZooKeeper {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- byte[] revData = zk.getData(path, watcher, stat);
+ byte[] revData = getZk().getData(path, watcher, stat);
return this.removeMetaData(revData);
} catch (KeeperException e) {
switch (e.code()) {
@@ -333,7 +337,7 @@ public class RecoverableZooKeeper {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- byte[] revData = zk.getData(path, watch, stat);
+ byte[] revData = getZk().getData(path, watch, stat);
return this.removeMetaData(revData);
} catch (KeeperException e) {
switch (e.code()) {
@@ -365,7 +369,7 @@ public class RecoverableZooKeeper {
boolean isRetry = false;
while (true) {
try {
- return zk.setData(path, newData, version);
+ return getZk().setData(path, newData, version);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -378,7 +382,7 @@ public class RecoverableZooKeeper {
// try to verify whether the previous setData success or not
try{
Stat stat = new Stat();
- byte[] revData = zk.getData(path, false, stat);
+ byte[] revData = getZk().getData(path, false, stat);
if(Bytes.compareTo(revData, newData) == 0) {
// the bad version is caused by previous successful setData
return stat;
@@ -400,8 +404,8 @@ public class RecoverableZooKeeper {
}
/**
- *
* NONSEQUENTIAL create is idempotent operation.
+ *
* Retry before throwing exceptions.
* But this function will not throw the NodeExist exception back to the
* application.
@@ -410,8 +414,6 @@ public class RecoverableZooKeeper {
* But SEQUENTIAL is NOT idempotent operation. It is necessary to add
* identifier to the path to verify, whether the previous one is successful
* or not.
- *
- *
* @return Path
*/
public String create(String path, byte[] data, List acl,
@@ -439,7 +441,7 @@ public class RecoverableZooKeeper {
boolean isRetry = false; // False for first attempt, true for all retries.
while (true) {
try {
- return zk.create(path, data, acl, createMode);
+ return getZk().create(path, data, acl, createMode);
} catch (KeeperException e) {
switch (e.code()) {
case NODEEXISTS:
@@ -447,7 +449,7 @@ public class RecoverableZooKeeper {
// If the connection was lost, there is still a possibility that
// we have successfully created the node at our previous attempt,
// so we read the node and compare.
- byte[] currentData = zk.getData(path, false, null);
+ byte[] currentData = getZk().getData(path, false, null);
if (currentData != null &&
Bytes.compareTo(currentData, data) == 0) {
// We successfully created a non-sequential node
@@ -494,7 +496,7 @@ public class RecoverableZooKeeper {
}
}
first = false;
- return zk.create(newPath, data, acl, createMode);
+ return getZk().create(newPath, data, acl, createMode);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -548,7 +550,7 @@ public class RecoverableZooKeeper {
Iterable multiOps = prepareZKMulti(ops);
while (true) {
try {
- return zk.multi(multiOps);
+ return getZk().multi(multiOps);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -573,11 +575,11 @@ public class RecoverableZooKeeper {
String parent = path.substring(0, lastSlashIdx);
String nodePrefix = path.substring(lastSlashIdx+1);
- List nodes = zk.getChildren(parent, false);
+ List nodes = getZk().getChildren(parent, false);
List matching = filterByPrefix(nodes, nodePrefix);
for (String node : matching) {
String nodePath = parent + "/" + node;
- Stat stat = zk.exists(nodePath, false);
+ Stat stat = getZk().exists(nodePath, false);
if (stat != null) {
return nodePath;
}
@@ -621,27 +623,36 @@ public class RecoverableZooKeeper {
}
public long getSessionId() {
- return zk.getSessionId();
+ return getZk().getSessionId();
}
public void close() throws InterruptedException {
- zk.close();
+ zkManager.close();
}
public States getState() {
- return zk.getState();
+ return getZk().getState();
}
public ZooKeeper getZooKeeper() {
- return zk;
+ return getZk();
}
public byte[] getSessionPasswd() {
- return zk.getSessionPasswd();
+ return getZk().getSessionPasswd();
}
public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
- this.zk.sync(path, null, null);
+ this.getZk().sync(path, null, null);
+ }
+
+ /**
+ * Returns the managed CuratorFramework instance. This function ensures that
+ * before returning the curator instance is initialized
+ * @return CuratorFramework instance
+ */
+ public CuratorFramework getCuratorClient() {
+ return zkManager.getCuratorClient();
}
/**
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index de3eedd..c08e148 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -103,7 +103,7 @@ public class ZKUtil {
* @return connection to zookeeper
* @throws IOException if unable to connect to zk or config problem
*/
- public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
+ public static RecoverableZooKeeper connect(Configuration conf, ZooKeeperWatcher watcher)
throws IOException {
Properties properties = ZKConfig.makeZKProps(conf);
String ensemble = ZKConfig.getZKQuorumServersString(properties);
@@ -111,13 +111,13 @@ public class ZKUtil {
}
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
- Watcher watcher)
+ ZooKeeperWatcher watcher)
throws IOException {
return connect(conf, ensemble, watcher, null);
}
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
- Watcher watcher, final String identifier)
+ ZooKeeperWatcher watcher, final String identifier)
throws IOException {
if(ensemble == null) {
throw new IOException("Unable to determine ZooKeeper ensemble");
@@ -1172,7 +1172,7 @@ public class ZKUtil {
createAndFailSilent(zkw,
(CreateAndFailSilent)ZKUtilOp.createAndFailSilent(znode, data));
}
-
+
private static void createAndFailSilent(ZooKeeperWatcher zkw, CreateAndFailSilent cafs)
throws KeeperException {
CreateRequest create = (CreateRequest)toZooKeeperOp(zkw, cafs).toRequestRecord();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java
new file mode 100644
index 0000000..7e5f1ae
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.netflix.curator.ensemble.EnsembleProvider;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.retry.ExponentialBackoffRetry;
+import com.netflix.curator.utils.ZookeeperFactory;
+
+/**
+ * Manages the ZooKeeper connection.
+ */
+@InterfaceAudience.Private
+class ZooKeeperManager {
+ private static final Log LOG = LogFactory.getLog(ZooKeeperManager.class);
+
+ private volatile CuratorFramework curatorClient;
+ private ManagedZooKeeperFactory zkFactory;
+ private Watcher zkWatcher;
+
+ private String connectString;
+ private int sessionTimeout;
+ private int maxRetries;
+ private int retryIntervalMillis;
+ private boolean canBeReadOnly;
+
+ /**
+ * Contains a set of watchers and forwards the events to all.
+ */
+ static class WatcherSet implements Watcher {
+ private Watcher[] watchers;
+
+ public WatcherSet(Watcher zkWatcher, Watcher curatorWatcher) {
+ this.watchers = new Watcher[] {zkWatcher, curatorWatcher};
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ for (Watcher watcher: watchers) {
+ LOG.debug("WatcherSet: sending event:" + event + " to watcher: " + watcher);
+ watcher.process(event);
+ }
+ }
+
+ Watcher getCuratorWatcher() {
+ return watchers[1];
+ }
+ }
+
+ static class ManagedZooKeeper extends ZooKeeper {
+ public ManagedZooKeeper(String connectString, int sessionTimeout,
+ Watcher watcher, boolean canBeReadOnly) throws IOException {
+ super(connectString, sessionTimeout, watcher, canBeReadOnly);
+ }
+ @Override
+ public synchronized void close() throws InterruptedException {
+ //close is no-op since we do not want curator to close the connection
+ }
+ public void shutDown() throws InterruptedException {
+ super.close();
+ }
+ }
+
+ /**
+ * In HBase, we want to manage our own ZooKeeper connection, and we do not want curator
+ * to recreate the connection. This class bridges bridges curator and our zk-management.
+ */
+ static class ManagedZooKeeperFactory implements ZookeeperFactory {
+ class ManagedEnsembleProvider implements EnsembleProvider {
+ @Override
+ public void start() throws Exception {
+ }
+ @Override
+ public String getConnectionString() {
+ return String.valueOf(zk.getSessionId());
+ }
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+ ZooKeeperWatcher zkWatcher;
+ ManagedZooKeeper zk = null;
+ ManagedZooKeeper lastZk = null;
+ ManagedEnsembleProvider ensembleProvider;
+ WatcherSet doubleWatcher;
+
+ ManagedZooKeeperFactory(ZooKeeperWatcher zkWatcher) {
+ this.zkWatcher = zkWatcher;
+ this.ensembleProvider = new ManagedEnsembleProvider();
+ }
+ @Override
+ public synchronized ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher curatorWatcher,
+ boolean canBeReadOnly) throws Exception {
+ LOG.debug("Curator requested newZookeeper()");
+ if (doubleWatcher != null) {
+ return zk;
+ }
+ // Wait for the new zookeeper to be initialized from HBase
+ while (this.zk == lastZk || this.zk == null) {
+ this.wait();
+ }
+ this.doubleWatcher = new WatcherSet(zkWatcher, curatorWatcher);
+ this.zk.register(doubleWatcher);
+
+ // HACK: send the last connection event to curator so that it initializes
+ // it's state
+ WatchedEvent lastEvent = zkWatcher.getLastConnectionEvent();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending last zookeeper event to curator: " + lastEvent);
+ }
+ if (lastEvent != null) {
+ curatorWatcher.process(lastEvent);
+ }
+
+ return this.zk;
+ }
+
+ public synchronized void close() throws InterruptedException {
+ if (this.zk != null) {
+ this.zk.shutDown();
+ this.zk = null;
+ }
+ }
+
+ public synchronized void connect(String connectString, int sessionTimeout, Watcher watcher,
+ boolean canBeReadOnly) throws IOException {
+ synchronized (this) {
+ Watcher w = watcher;
+ if (this.doubleWatcher != null) {
+ //assume the watcher from Curator does not change while the CuratorFramework is alive
+ w = this.doubleWatcher = new WatcherSet(watcher, doubleWatcher.getCuratorWatcher());
+ }
+ this.lastZk = zk;
+ this.zk = new ManagedZooKeeper(connectString, sessionTimeout, w, canBeReadOnly);
+ this.notify();
+ }
+ }
+ }
+
+ public ZooKeeperManager(ZooKeeperWatcher zkWatcher, String connectString, int sessionTimeout,
+ int maxRetries, int retryIntervalMillis, boolean canBeReadOnly) {
+ this.zkWatcher = zkWatcher;
+ this.connectString = connectString;
+ this.sessionTimeout = sessionTimeout;
+ this.maxRetries = maxRetries;
+ this.retryIntervalMillis = retryIntervalMillis;
+ this.canBeReadOnly = canBeReadOnly;
+ this.zkFactory = new ManagedZooKeeperFactory(zkWatcher);
+ }
+
+ /**
+ * Initializes the curator framework. We call this only when we need it, so that
+ * curator is not initialized from the client side.
+ */
+ private synchronized void initializeCurator() {
+ if (this.curatorClient == null) {
+ this.curatorClient = CuratorFrameworkFactory.builder()
+ .sessionTimeoutMs(sessionTimeout)
+ .zookeeperFactory(zkFactory)
+ .ensembleProvider(zkFactory.ensembleProvider)
+ .retryPolicy(new ExponentialBackoffRetry(retryIntervalMillis, maxRetries))
+ .canBeReadOnly(canBeReadOnly)
+ .build();
+ this.curatorClient.start();
+ }
+ }
+
+ public void connect() throws IOException {
+ this.zkFactory.connect(connectString, sessionTimeout, zkWatcher, canBeReadOnly);
+
+ }
+
+ public void reconnect() throws IOException, InterruptedException {
+ zkFactory.close(); //we don't want to close the CuratorClient
+ connect();
+ }
+
+ /**
+ * Returns the managed CuratorFramework instance. This function ensures that
+ * before returning the curator instance is initialized
+ * @return CuratorFramework instance
+ */
+ public CuratorFramework getCuratorClient() {
+ if (curatorClient == null) {
+ //note: since curatorClient is volatile, this double checked locking
+ // is not broken under JDK5+ (http://en.wikipedia.org/wiki/Double-checked_locking)
+ initializeCurator();
+ }
+ return this.curatorClient;
+ }
+
+ /**
+ * Returns the underlying ZooKeeper instance
+ * @return ZooKeeper object
+ */
+ public ZooKeeper getZooKeeper() {
+ return this.zkFactory.zk;
+ }
+
+ public void close() throws InterruptedException {
+ if (this.curatorClient != null) {
+ this.curatorClient.close();
+ }
+ zkFactory.close();
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index d10dd7c..c0a8469 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -110,6 +110,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// znode containing the state of recovering regions
public String recoveringRegionsZNode;
+ //Keep around the last even about the connection
+ private WatchedEvent lastConnectionEvent;
+
// Certain ZooKeeper nodes need to be world-readable
public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE =
new ArrayList() { {
@@ -362,6 +365,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
* @param event
*/
private void connectionEvent(WatchedEvent event) {
+ this.lastConnectionEvent = event;
switch(event.getState()) {
case SyncConnected:
// Now, this callback can be invoked before the this.zookeeper is set.
@@ -495,4 +499,11 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
return this.masterAddressZNode;
}
+ /**
+ * Returns the last connection event that has been received.
+ * @return a WatchedEvent that has been received
+ */
+ public WatchedEvent getLastConnectionEvent() {
+ return lastConnectionEvent;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java
index 99a214d..2452fb1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java
@@ -18,14 +18,22 @@
package org.apache.hadoop.hbase.zookeeper;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -58,7 +66,7 @@ public class TestZKLeaderManager {
}
private static class MockLeader extends Thread implements Stoppable {
- private boolean stopped;
+ private volatile boolean stopped;
private ZooKeeperWatcher watcher;
private ZKLeaderManager zkLeader;
private AtomicBoolean master = new AtomicBoolean(false);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java
new file mode 100644
index 0000000..efa9ecf
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.imps.CuratorFrameworkState;
+
+@Category(MediumTests.class)
+public class TestZooKeeperManager {
+ private static final Log LOG = LogFactory.getLog(TestZooKeeperManager.class);
+
+ HBaseTestingUtility util;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new HBaseTestingUtility();
+ util.startMiniZKCluster(1);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ util.shutdownMiniZKCluster();
+ }
+
+ /** Returns the value of a private field */
+ @SuppressWarnings("unchecked")
+ T getField(O obj, String fieldName) throws SecurityException, NoSuchFieldException,
+ IllegalArgumentException, IllegalAccessException {
+ Class> clazz = obj.getClass();
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return (T) field.get(obj);
+ }
+
+ @Test
+ public void testCuratorIsNotInitializedIfNotUsed() throws Exception {
+ ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher();
+ //do a simple operation
+ zkWatcher.getRecoverableZooKeeper().exists(zkWatcher.baseZNode, false);
+
+ RecoverableZooKeeper zk = zkWatcher.getRecoverableZooKeeper();
+
+ ZooKeeperManager manager = getField(zk, "zkManager");
+ CuratorFramework curatorClient = getField(manager, "curatorClient");
+ Assert.assertNull(curatorClient);
+ }
+
+ @Test(timeout=20000)
+ public void testCuratorInitializationRightAfterZkWatcher() throws Exception {
+ ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher();
+ CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient();
+ Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState());
+ Assert.assertNotNull(curatorClient.checkExists().forPath("/"));
+ }
+
+ @Test(timeout=20000)
+ public void testCuratorInitializationThenSessionExpiration() throws Exception {
+ ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher();
+ Assert.assertNotNull(zkWatcher.getRecoverableZooKeeper().exists("/", false));
+ CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient();
+ Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState());
+ curatorClient.checkExists().forPath("/");
+
+ util.expireSession(zkWatcher);
+ Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState());
+ zkWatcher.reconnectAfterExpiration();
+ Assert.assertNotNull(curatorClient.checkExists().forPath("/"));
+ }
+
+ @Test(timeout=20000)
+ public void testCuratorInitializationAfterSessionExpiration() throws Exception {
+ ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher();
+ Assert.assertNotNull(zkWatcher.getRecoverableZooKeeper().exists("/", false));
+ util.expireSession(zkWatcher);
+
+ CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient();
+ Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState());
+ zkWatcher.reconnectAfterExpiration();
+ Assert.assertNotNull(curatorClient.checkExists().forPath("/"));
+ }
+}
diff --git a/pom.xml b/pom.xml
index e27cc1b..5271303 100644
--- a/pom.xml
+++ b/pom.xml
@@ -901,6 +901,7 @@
1.9.0
2.4.1
1.0.1
+ 1.2.5
0.9.0
3.4.5
0.0.1-SNAPSHOT
@@ -1277,6 +1278,11 @@
${stax-api.version}