org.apache.zookeeper
zookeeper
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (revision 1497039)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java (working copy)
@@ -46,6 +46,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import com.netflix.curator.framework.CuratorFramework;
/**
* A zookeeper that can handle 'recoverable' errors.
@@ -74,14 +75,11 @@
@InterfaceStability.Evolving
public class RecoverableZooKeeper {
private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
- // the actual ZooKeeper client instance
- volatile private ZooKeeper zk;
+ private volatile ZooKeeperManager zkManager;
private final RetryCounterFactory retryCounterFactory;
// An identifier of this process in the cluster
private final String identifier;
private final byte[] id;
- private Watcher watcher;
- private int sessionTimeout;
private String quorumServers;
private final Random salter;
@@ -99,16 +97,18 @@
private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
- Watcher watcher, int maxRetries, int retryIntervalMillis)
+ ZooKeeperWatcher watcher, int maxRetries, int retryIntervalMillis)
throws IOException {
this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
null);
}
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
- Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
+ ZooKeeperWatcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
throws IOException {
- this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
+ this.zkManager = new ZooKeeperManager(watcher, quorumServers, sessionTimeout,
+ maxRetries, retryIntervalMillis, false);
+ this.zkManager.connect();
this.retryCounterFactory =
new RetryCounterFactory(maxRetries, retryIntervalMillis);
@@ -120,24 +120,22 @@
" connecting to ZooKeeper ensemble=" + quorumServers);
this.identifier = identifier;
this.id = Bytes.toBytes(identifier);
-
- this.watcher = watcher;
- this.sessionTimeout = sessionTimeout;
this.quorumServers = quorumServers;
salter = new SecureRandom();
}
public void reconnectAfterExpiration()
throws IOException, InterruptedException {
- LOG.info("Closing dead ZooKeeper connection, session" +
- " was: 0x"+Long.toHexString(zk.getSessionId()));
- zk.close();
- this.zk = new ZooKeeper(this.quorumServers,
- this.sessionTimeout, this.watcher);
- LOG.info("Recreated a ZooKeeper, session" +
- " is: 0x"+Long.toHexString(zk.getSessionId()));
+ LOG.info("Reconnecting expired ZooKeeper connection, session was: 0x" +
+ Long.toHexString(getZk().getSessionId()));
+ zkManager.reconnect();
+ LOG.info("Reconnected ZooKeeper session as: 0x"+Long.toHexString(getZk().getSessionId()));
}
+ private ZooKeeper getZk() {
+ return zkManager.getZooKeeper();
+ }
+
/**
* delete is an idempotent operation. Retry before throwing exception.
* This function will not throw NoNodeException if the path does not
@@ -149,7 +147,8 @@
boolean isRetry = false; // False for first attempt, true for all retries.
while (true) {
try {
- zk.delete(path, version);
+ ZooKeeper zk = getZk();
+ if (zk != null) zk.delete(path, version);
return;
} catch (KeeperException e) {
switch (e.code()) {
@@ -187,7 +186,7 @@
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- return zk.exists(path, watcher);
+ return getZk().exists(path, watcher);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -214,7 +213,7 @@
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- return zk.exists(path, watch);
+ return getZk().exists(path, watch);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -251,7 +250,7 @@
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- return zk.getChildren(path, watcher);
+ return getZk().getChildren(path, watcher);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -278,7 +277,7 @@
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- return zk.getChildren(path, watch);
+ return getZk().getChildren(path, watch);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -305,7 +304,7 @@
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- byte[] revData = zk.getData(path, watcher, stat);
+ byte[] revData = getZk().getData(path, watcher, stat);
return this.removeMetaData(revData);
} catch (KeeperException e) {
switch (e.code()) {
@@ -333,7 +332,7 @@
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
- byte[] revData = zk.getData(path, watch, stat);
+ byte[] revData = getZk().getData(path, watch, stat);
return this.removeMetaData(revData);
} catch (KeeperException e) {
switch (e.code()) {
@@ -365,7 +364,7 @@
boolean isRetry = false;
while (true) {
try {
- return zk.setData(path, newData, version);
+ return getZk().setData(path, newData, version);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -378,7 +377,7 @@
// try to verify whether the previous setData success or not
try{
Stat stat = new Stat();
- byte[] revData = zk.getData(path, false, stat);
+ byte[] revData = getZk().getData(path, false, stat);
if(Bytes.compareTo(revData, newData) == 0) {
// the bad version is caused by previous successful setData
return stat;
@@ -400,8 +399,8 @@
}
/**
+ * NONSEQUENTIAL create is idempotent operation.
*
- * NONSEQUENTIAL create is idempotent operation.
* Retry before throwing exceptions.
* But this function will not throw the NodeExist exception back to the
* application.
@@ -410,8 +409,6 @@
* But SEQUENTIAL is NOT idempotent operation. It is necessary to add
* identifier to the path to verify, whether the previous one is successful
* or not.
- *
- *
* @return Path
*/
public String create(String path, byte[] data, List acl,
@@ -439,7 +436,7 @@
boolean isRetry = false; // False for first attempt, true for all retries.
while (true) {
try {
- return zk.create(path, data, acl, createMode);
+ return getZk().create(path, data, acl, createMode);
} catch (KeeperException e) {
switch (e.code()) {
case NODEEXISTS:
@@ -447,7 +444,7 @@
// If the connection was lost, there is still a possibility that
// we have successfully created the node at our previous attempt,
// so we read the node and compare.
- byte[] currentData = zk.getData(path, false, null);
+ byte[] currentData = getZk().getData(path, false, null);
if (currentData != null &&
Bytes.compareTo(currentData, data) == 0) {
// We successfully created a non-sequential node
@@ -494,7 +491,7 @@
}
}
first = false;
- return zk.create(newPath, data, acl, createMode);
+ return getZk().create(newPath, data, acl, createMode);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -548,7 +545,7 @@
Iterable multiOps = prepareZKMulti(ops);
while (true) {
try {
- return zk.multi(multiOps);
+ return getZk().multi(multiOps);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
@@ -573,11 +570,11 @@
String parent = path.substring(0, lastSlashIdx);
String nodePrefix = path.substring(lastSlashIdx+1);
- List nodes = zk.getChildren(parent, false);
+ List nodes = getZk().getChildren(parent, false);
List matching = filterByPrefix(nodes, nodePrefix);
for (String node : matching) {
String nodePath = parent + "/" + node;
- Stat stat = zk.exists(nodePath, false);
+ Stat stat = getZk().exists(nodePath, false);
if (stat != null) {
return nodePath;
}
@@ -621,30 +618,39 @@
}
public long getSessionId() {
- return zk.getSessionId();
+ return getZk().getSessionId();
}
public void close() throws InterruptedException {
- zk.close();
+ zkManager.close();
}
public States getState() {
- return zk.getState();
+ return getZk().getState();
}
public ZooKeeper getZooKeeper() {
- return zk;
+ return getZk();
}
public byte[] getSessionPasswd() {
- return zk.getSessionPasswd();
+ return getZk().getSessionPasswd();
}
public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
- this.zk.sync(path, null, null);
+ this.getZk().sync(path, null, null);
}
/**
+ * Returns the managed CuratorFramework instance. This function ensures that
+ * before returning the curator instance is initialized
+ * @return CuratorFramework instance
+ */
+ public CuratorFramework getCuratorClient() {
+ return zkManager.getCuratorClient();
+ }
+
+ /**
* Filters the given node list by the given prefixes.
* This method is all-inclusive--if any element in the node list starts
* with any of the given prefixes, then it is included in the result.
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (revision 1497039)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (working copy)
@@ -103,7 +103,7 @@
* @return connection to zookeeper
* @throws IOException if unable to connect to zk or config problem
*/
- public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
+ public static RecoverableZooKeeper connect(Configuration conf, ZooKeeperWatcher watcher)
throws IOException {
Properties properties = ZKConfig.makeZKProps(conf);
String ensemble = ZKConfig.getZKQuorumServersString(properties);
@@ -111,13 +111,13 @@
}
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
- Watcher watcher)
+ ZooKeeperWatcher watcher)
throws IOException {
return connect(conf, ensemble, watcher, null);
}
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
- Watcher watcher, final String identifier)
+ ZooKeeperWatcher watcher, final String identifier)
throws IOException {
if(ensemble == null) {
throw new IOException("Unable to determine ZooKeeper ensemble");
@@ -1172,7 +1172,7 @@
createAndFailSilent(zkw,
(CreateAndFailSilent)ZKUtilOp.createAndFailSilent(znode, data));
}
-
+
private static void createAndFailSilent(ZooKeeperWatcher zkw, CreateAndFailSilent cafs)
throws KeeperException {
CreateRequest create = (CreateRequest)toZooKeeperOp(zkw, cafs).toRequestRecord();
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java (revision 0)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java (working copy)
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+import com.netflix.curator.ensemble.EnsembleProvider;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.retry.ExponentialBackoffRetry;
+import com.netflix.curator.utils.ZookeeperFactory;
+
+/**
+ * Manages the ZooKeeper connection.
+ */
+@InterfaceAudience.Private
+class ZooKeeperManager {
+ private static final Log LOG = LogFactory.getLog(ZooKeeperManager.class);
+
+ private volatile CuratorFramework curatorClient;
+ private ManagedZooKeeperFactory zkFactory;
+ private Watcher zkWatcher;
+
+ private String connectString;
+ private int sessionTimeout;
+ private int maxRetries;
+ private int retryIntervalMillis;
+ private boolean canBeReadOnly;
+
+ /**
+ * Contains a set of watchers and forwards the events to all.
+ */
+ static class WatcherSet implements Watcher {
+ private Watcher[] watchers;
+
+ public WatcherSet(Watcher zkWatcher, Watcher curatorWatcher) {
+ this.watchers = new Watcher[] {zkWatcher, curatorWatcher};
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ for (Watcher watcher: watchers) {
+ LOG.debug("WatcherSet: sending event:" + event + " to watcher: " + watcher);
+ watcher.process(event);
+ }
+ }
+
+ Watcher getCuratorWatcher() {
+ return watchers[1];
+ }
+ }
+
+ static class ManagedZooKeeper extends ZooKeeper {
+ public ManagedZooKeeper(String connectString, int sessionTimeout,
+ Watcher watcher, boolean canBeReadOnly) throws IOException {
+ super(connectString, sessionTimeout, watcher, canBeReadOnly);
+ }
+ @Override
+ public synchronized void close() throws InterruptedException {
+ //close is no-op since we do not want curator to close the connection
+ }
+ public void shutDown() throws InterruptedException {
+ super.close();
+ }
+ }
+
+ /**
+ * In HBase, we want to manage our own ZooKeeper connection, and we do not want curator
+ * to recreate the connection. This class bridges bridges curator and our zk-management.
+ */
+ static class ManagedZooKeeperFactory implements ZookeeperFactory {
+ class ManagedEnsembleProvider implements EnsembleProvider {
+ @Override
+ public void start() throws Exception {
+ }
+ @Override
+ public String getConnectionString() {
+ return String.valueOf(zk.getSessionId());
+ }
+ @Override
+ public void close() throws IOException {
+ }
+ }
+
+ ZooKeeperWatcher zkWatcher;
+ ManagedZooKeeper zk = null;
+ ManagedZooKeeper lastZk = null;
+ ManagedEnsembleProvider ensembleProvider;
+ WatcherSet doubleWatcher;
+
+ ManagedZooKeeperFactory(ZooKeeperWatcher zkWatcher) {
+ this.zkWatcher = zkWatcher;
+ this.ensembleProvider = new ManagedEnsembleProvider();
+ }
+ @Override
+ public synchronized ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher curatorWatcher,
+ boolean canBeReadOnly) throws Exception {
+ LOG.debug("Curator requested newZookeeper()");
+ if (doubleWatcher != null) {
+ return zk;
+ }
+ // Wait for the new zookeeper to be initialized from HBase
+ while (this.zk == lastZk || this.zk == null) {
+ this.wait();
+ }
+ this.doubleWatcher = new WatcherSet(zkWatcher, curatorWatcher);
+ this.zk.register(doubleWatcher);
+
+ // HACK: send the last connection event to curator so that it initializes
+ // it's state
+ WatchedEvent lastEvent = zkWatcher.getLastConnectionEvent();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending last zookeeper event to curator: " + lastEvent);
+ }
+ if (lastEvent != null) {
+ curatorWatcher.process(lastEvent);
+ }
+
+ return this.zk;
+ }
+
+ public synchronized void close() throws InterruptedException {
+ if (this.zk != null) {
+ this.zk.shutDown();
+ this.zk = null;
+ }
+ }
+
+ public synchronized void connect(String connectString, int sessionTimeout, Watcher watcher,
+ boolean canBeReadOnly) throws IOException {
+ synchronized (this) {
+ Watcher w = watcher;
+ if (this.doubleWatcher != null) {
+ //assume the watcher from Curator does not change while the CuratorFramework is alive
+ w = this.doubleWatcher = new WatcherSet(watcher, doubleWatcher.getCuratorWatcher());
+ }
+ this.lastZk = zk;
+ this.zk = new ManagedZooKeeper(connectString, sessionTimeout, w, canBeReadOnly);
+ this.notify();
+ }
+ }
+ }
+
+ public ZooKeeperManager(ZooKeeperWatcher zkWatcher, String connectString, int sessionTimeout,
+ int maxRetries, int retryIntervalMillis, boolean canBeReadOnly) {
+ this.zkWatcher = zkWatcher;
+ this.connectString = connectString;
+ this.sessionTimeout = sessionTimeout;
+ this.maxRetries = maxRetries;
+ this.retryIntervalMillis = retryIntervalMillis;
+ this.canBeReadOnly = canBeReadOnly;
+ this.zkFactory = new ManagedZooKeeperFactory(zkWatcher);
+ }
+
+ /**
+ * Initializes the curator framework. We call this only when we need it, so that
+ * curator is not initialized from the client side.
+ */
+ private synchronized void initializeCurator() {
+ if (this.curatorClient == null) {
+ this.curatorClient = CuratorFrameworkFactory.builder()
+ .sessionTimeoutMs(sessionTimeout)
+ .zookeeperFactory(zkFactory)
+ .ensembleProvider(zkFactory.ensembleProvider)
+ .retryPolicy(new ExponentialBackoffRetry(retryIntervalMillis, maxRetries))
+ .canBeReadOnly(canBeReadOnly)
+ .build();
+ this.curatorClient.start();
+ }
+ }
+
+ public void connect() throws IOException {
+ this.zkFactory.connect(connectString, sessionTimeout, zkWatcher, canBeReadOnly);
+
+ }
+
+ public void reconnect() throws IOException, InterruptedException {
+ zkFactory.close(); //we don't want to close the CuratorClient
+ connect();
+ }
+
+ /**
+ * Returns the managed CuratorFramework instance. This function ensures that
+ * before returning the curator instance is initialized
+ * @return CuratorFramework instance
+ */
+ public CuratorFramework getCuratorClient() {
+ if (curatorClient == null) {
+ //note: since curatorClient is volatile, this double checked locking
+ // is not broken under JDK5+ (http://en.wikipedia.org/wiki/Double-checked_locking)
+ initializeCurator();
+ }
+ return this.curatorClient;
+ }
+
+ /**
+ * Returns the underlying ZooKeeper instance
+ * @return ZooKeeper object
+ */
+ public ZooKeeper getZooKeeper() {
+ return this.zkFactory.zk;
+ }
+
+ public void close() throws InterruptedException {
+ if (this.curatorClient != null) {
+ this.curatorClient.close();
+ }
+ zkFactory.close();
+ }
+}
Index: hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
===================================================================
--- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1497039)
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy)
@@ -110,6 +110,9 @@
// znode containing the state of recovering regions
public String recoveringRegionsZNode;
+ //Keep around the last even about the connection
+ private WatchedEvent lastConnectionEvent;
+
// Certain ZooKeeper nodes need to be world-readable
public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE =
new ArrayList() { {
@@ -362,6 +365,7 @@
* @param event
*/
private void connectionEvent(WatchedEvent event) {
+ this.lastConnectionEvent = event;
switch(event.getState()) {
case SyncConnected:
// Now, this callback can be invoked before the this.zookeeper is set.
@@ -495,4 +499,11 @@
return this.masterAddressZNode;
}
+ /**
+ * Returns the last connection event that has been received.
+ * @return a WatchedEvent that has been received
+ */
+ public WatchedEvent getLastConnectionEvent() {
+ return lastConnectionEvent;
+ }
}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java (revision 1497039)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java (working copy)
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.zookeeper;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(MediumTests.class)
-public class TestRecoverableZooKeeper {
-
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- Abortable abortable = new Abortable() {
- @Override
- public void abort(String why, Throwable e) {
-
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
- };
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- TEST_UTIL.startMiniZKCluster();
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniZKCluster();
- }
-
- @Test
- public void testSetDataVersionMismatchInLoop() throws Exception {
- String znode = "/hbase/region-in-transition/9af7cfc9b15910a0b3d714bf40a3248f";
- Configuration conf = TEST_UTIL.getConfiguration();
- Properties properties = ZKConfig.makeZKProps(conf);
- ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testSetDataVersionMismatchInLoop",
- abortable, true);
- String ensemble = ZKConfig.getZKQuorumServersString(properties);
- RecoverableZooKeeper rzk = ZKUtil.connect(conf, ensemble, zkw);
- rzk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- rzk.setData(znode, "OPENING".getBytes(), 0);
- Field zkField = RecoverableZooKeeper.class.getDeclaredField("zk");
- zkField.setAccessible(true);
- int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
- ZookeeperStub zkStub = new ZookeeperStub(ensemble, timeout, zkw);
- zkStub.setThrowExceptionInNumOperations(1);
- zkField.set(rzk, zkStub);
- byte[] opened = "OPENED".getBytes();
- rzk.setData(znode, opened, 1);
- byte[] data = rzk.getData(znode, false, new Stat());
- assertTrue(Bytes.equals(opened, data));
- }
-
- class ZookeeperStub extends ZooKeeper {
-
- private int throwExceptionInNumOperations;
-
- public ZookeeperStub(String connectString, int sessionTimeout, Watcher watcher)
- throws IOException {
- super(connectString, sessionTimeout, watcher);
- }
-
- public void setThrowExceptionInNumOperations(int throwExceptionInNumOperations) {
- this.throwExceptionInNumOperations = throwExceptionInNumOperations;
- }
-
- private void checkThrowKeeperException() throws KeeperException {
- if (throwExceptionInNumOperations == 1) {
- throwExceptionInNumOperations = 0;
- throw new KeeperException.ConnectionLossException();
- }
- if (throwExceptionInNumOperations > 0)
- throwExceptionInNumOperations--;
- }
-
- @Override
- public Stat setData(String path, byte[] data, int version) throws KeeperException,
- InterruptedException {
- Stat stat = super.setData(path, data, version);
- checkThrowKeeperException();
- return stat;
- }
- }
-}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java (revision 1497039)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java (working copy)
@@ -18,14 +18,22 @@
package org.apache.hadoop.hbase.zookeeper;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -58,7 +66,7 @@
}
private static class MockLeader extends Thread implements Stoppable {
- private boolean stopped;
+ private volatile boolean stopped;
private ZooKeeperWatcher watcher;
private ZKLeaderManager zkLeader;
private AtomicBoolean master = new AtomicBoolean(false);
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java (revision 0)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java (working copy)
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.imps.CuratorFrameworkState;
+
+@Category(MediumTests.class)
+public class TestZooKeeperManager {
+ private static final Log LOG = LogFactory.getLog(TestZooKeeperManager.class);
+
+ HBaseTestingUtility util;
+
+ @Before
+ public void setUp() throws Exception {
+ util = new HBaseTestingUtility();
+ util.startMiniZKCluster(1);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ util.shutdownMiniZKCluster();
+ }
+
+ /** Returns the value of a private field */
+ @SuppressWarnings("unchecked")
+ T getField(O obj, String fieldName) throws SecurityException, NoSuchFieldException,
+ IllegalArgumentException, IllegalAccessException {
+ Class> clazz = obj.getClass();
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return (T) field.get(obj);
+ }
+
+ @Test
+ public void testCuratorIsNotInitializedIfNotUsed() throws Exception {
+ ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher();
+ //do a simple operation
+ zkWatcher.getRecoverableZooKeeper().exists(zkWatcher.baseZNode, false);
+
+ RecoverableZooKeeper zk = zkWatcher.getRecoverableZooKeeper();
+
+ ZooKeeperManager manager = getField(zk, "zkManager");
+ CuratorFramework curatorClient = getField(manager, "curatorClient");
+ Assert.assertNull(curatorClient);
+ }
+
+ @Test(timeout=20000)
+ public void testCuratorInitializationRightAfterZkWatcher() throws Exception {
+ ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher();
+ CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient();
+ Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState());
+ Assert.assertNotNull(curatorClient.checkExists().forPath("/"));
+ }
+
+ @Test(timeout=20000)
+ public void testCuratorInitializationThenSessionExpiration() throws Exception {
+ ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher();
+ Assert.assertNotNull(zkWatcher.getRecoverableZooKeeper().exists("/", false));
+ CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient();
+ Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState());
+ curatorClient.checkExists().forPath("/");
+
+ util.expireSession(zkWatcher);
+ Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState());
+ zkWatcher.reconnectAfterExpiration();
+ Assert.assertNotNull(curatorClient.checkExists().forPath("/"));
+ }
+
+ @Test(timeout=20000)
+ public void testCuratorInitializationAfterSessionExpiration() throws Exception {
+ ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher();
+ Assert.assertNotNull(zkWatcher.getRecoverableZooKeeper().exists("/", false));
+ util.expireSession(zkWatcher);
+
+ CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient();
+ Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState());
+ zkWatcher.reconnectAfterExpiration();
+ Assert.assertNotNull(curatorClient.checkExists().forPath("/"));
+ }
+}
Index: pom.xml
===================================================================
--- pom.xml (revision 1497039)
+++ pom.xml (working copy)
@@ -910,6 +910,7 @@
1.9.0
2.4.1
1.0.1
+ 1.2.5
0.9.0
3.4.5
0.0.1-SNAPSHOT
@@ -1286,6 +1287,11 @@
${stax-api.version}