diff --git hbase-server/pom.xml hbase-server/pom.xml index 53c643b..f14581f 100644 --- hbase-server/pom.xml +++ hbase-server/pom.xml @@ -419,6 +419,10 @@ org.jamon jamon-runtime + + com.netflix.curator + curator-recipes + com.google.protobuf diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index 4b355f7..4a0aa1a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -41,26 +41,28 @@ import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; +import com.netflix.curator.framework.CuratorFramework; + /** * A zookeeper that can handle 'recoverable' errors. - * To handle recoverable errors, developers need to realize that there are two - * classes of requests: idempotent and non-idempotent requests. Read requests - * and unconditional sets and deletes are examples of idempotent requests, they - * can be reissued with the same results. - * (Although, the delete may throw a NoNodeException on reissue its effect on - * the ZooKeeper state is the same.) Non-idempotent requests need special - * handling, application and library writers need to keep in mind that they may - * need to encode information in the data or name of znodes to detect - * retries. A simple example is a create that uses a sequence flag. - * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection - * loss exception, that process will reissue another - * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a - * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be - * that x-109 was the result of the previous create, so the process actually - * owns both x-109 and x-111. An easy way around this is to use "x-process id-" + * To handle recoverable errors, developers need to realize that there are two + * classes of requests: idempotent and non-idempotent requests. Read requests + * and unconditional sets and deletes are examples of idempotent requests, they + * can be reissued with the same results. + * (Although, the delete may throw a NoNodeException on reissue its effect on + * the ZooKeeper state is the same.) Non-idempotent requests need special + * handling, application and library writers need to keep in mind that they may + * need to encode information in the data or name of znodes to detect + * retries. A simple example is a create that uses a sequence flag. + * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection + * loss exception, that process will reissue another + * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a + * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be + * that x-109 was the result of the previous create, so the process actually + * owns both x-109 and x-111. An easy way around this is to use "x-process id-" * when doing the create. If the process is using an id of 352, before reissuing - * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", - * "x-352-109", x-333-110". The process will know that the original create + * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", + * "x-352-109", x-333-110". The process will know that the original create * succeeded an the znode it created is "x-352-109". * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling" */ @@ -68,8 +70,8 @@ import org.apache.zookeeper.data.Stat; @InterfaceStability.Evolving public class RecoverableZooKeeper { private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class); - // the actual ZooKeeper client instance - private ZooKeeper zk; + + private ZooKeeperManager zkManager; private final RetryCounterFactory retryCounterFactory; // An identifier of this process in the cluster private final String identifier; @@ -93,9 +95,11 @@ public class RecoverableZooKeeper { private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT; public RecoverableZooKeeper(String quorumServers, int sessionTimeout, - Watcher watcher, int maxRetries, int retryIntervalMillis) + ZooKeeperWatcher watcher, int maxRetries, int retryIntervalMillis) throws IOException { - this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); + this.zkManager = new ZooKeeperManager(watcher, quorumServers, sessionTimeout, + maxRetries, retryIntervalMillis, false); + this.zkManager.connect(); this.retryCounterFactory = new RetryCounterFactory(maxRetries, retryIntervalMillis); @@ -113,12 +117,14 @@ public class RecoverableZooKeeper { public void reconnectAfterExpiration() throws IOException, InterruptedException { LOG.info("Closing dead ZooKeeper connection, session" + - " was: 0x"+Long.toHexString(zk.getSessionId())); - zk.close(); - this.zk = new ZooKeeper(this.quorumServers, - this.sessionTimeout, this.watcher); + " was: 0x"+Long.toHexString(getZk().getSessionId())); + zkManager.reconnect(); LOG.info("Recreated a ZooKeeper, session" + - " is: 0x"+Long.toHexString(zk.getSessionId())); + " is: 0x"+Long.toHexString(getZk().getSessionId())); + } + + private ZooKeeper getZk() { + return zkManager.getZooKeeper(); } /** @@ -132,7 +138,7 @@ public class RecoverableZooKeeper { boolean isRetry = false; // False for first attempt, true for all retries. while (true) { try { - zk.delete(path, version); + getZk().delete(path, version); return; } catch (KeeperException e) { switch (e.code()) { @@ -170,7 +176,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - return zk.exists(path, watcher); + return getZk().exists(path, watcher); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -197,7 +203,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - return zk.exists(path, watch); + return getZk().exists(path, watch); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -234,7 +240,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - return zk.getChildren(path, watcher); + return getZk().getChildren(path, watcher); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -261,7 +267,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - return zk.getChildren(path, watch); + return getZk().getChildren(path, watch); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -288,7 +294,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - byte[] revData = zk.getData(path, watcher, stat); + byte[] revData = getZk().getData(path, watcher, stat); return this.removeMetaData(revData); } catch (KeeperException e) { switch (e.code()) { @@ -316,7 +322,7 @@ public class RecoverableZooKeeper { RetryCounter retryCounter = retryCounterFactory.create(); while (true) { try { - byte[] revData = zk.getData(path, watch, stat); + byte[] revData = getZk().getData(path, watch, stat); return this.removeMetaData(revData); } catch (KeeperException e) { switch (e.code()) { @@ -337,7 +343,7 @@ public class RecoverableZooKeeper { /** * setData is NOT an idempotent operation. Retry may cause BadVersion Exception - * Adding an identifier field into the data to check whether + * Adding an identifier field into the data to check whether * badversion is caused by the result of previous correctly setData * @return Stat instance */ @@ -348,7 +354,7 @@ public class RecoverableZooKeeper { boolean isRetry = false; while (true) { try { - return zk.setData(path, newData, version); + return getZk().setData(path, newData, version); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -361,7 +367,7 @@ public class RecoverableZooKeeper { // try to verify whether the previous setData success or not try{ Stat stat = new Stat(); - byte[] revData = zk.getData(path, false, stat); + byte[] revData = getZk().getData(path, false, stat); if(Bytes.compareTo(revData, newData) == 0) { // the bad version is caused by previous successful setData return stat; @@ -384,17 +390,17 @@ public class RecoverableZooKeeper { /** *

- * NONSEQUENTIAL create is idempotent operation. + * NONSEQUENTIAL create is idempotent operation. * Retry before throwing exceptions. * But this function will not throw the NodeExist exception back to the * application. *

*

- * But SEQUENTIAL is NOT idempotent operation. It is necessary to add - * identifier to the path to verify, whether the previous one is successful + * But SEQUENTIAL is NOT idempotent operation. It is necessary to add + * identifier to the path to verify, whether the previous one is successful * or not. *

- * + * * @return Path */ public String create(String path, byte[] data, List acl, @@ -411,32 +417,32 @@ public class RecoverableZooKeeper { return createSequential(path, newData, acl, createMode); default: - throw new IllegalArgumentException("Unrecognized CreateMode: " + + throw new IllegalArgumentException("Unrecognized CreateMode: " + createMode); } } - private String createNonSequential(String path, byte[] data, List acl, + private String createNonSequential(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { RetryCounter retryCounter = retryCounterFactory.create(); boolean isRetry = false; // False for first attempt, true for all retries. while (true) { try { - return zk.create(path, data, acl, createMode); + return getZk().create(path, data, acl, createMode); } catch (KeeperException e) { switch (e.code()) { case NODEEXISTS: if (isRetry) { // If the connection was lost, there is still a possibility that // we have successfully created the node at our previous attempt, - // so we read the node and compare. - byte[] currentData = zk.getData(path, false, null); + // so we read the node and compare. + byte[] currentData = getZk().getData(path, false, null); if (currentData != null && - Bytes.compareTo(currentData, data) == 0) { + Bytes.compareTo(currentData, data) == 0) { // We successfully created a non-sequential node return path; } - LOG.error("Node " + path + " already exists with " + + LOG.error("Node " + path + " already exists with " + Bytes.toStringBinary(currentData) + ", could not write " + Bytes.toStringBinary(data)); throw e; @@ -460,8 +466,8 @@ public class RecoverableZooKeeper { isRetry = true; } } - - private String createSequential(String path, byte[] data, + + private String createSequential(String path, byte[] data, List acl, CreateMode createMode) throws KeeperException, InterruptedException { RetryCounter retryCounter = retryCounterFactory.create(); @@ -477,7 +483,7 @@ public class RecoverableZooKeeper { } } first = false; - return zk.create(newPath, data, acl, createMode); + return getZk().create(newPath, data, acl, createMode); } catch (KeeperException e) { switch (e.code()) { case CONNECTIONLOSS: @@ -502,18 +508,18 @@ public class RecoverableZooKeeper { String parent = path.substring(0, lastSlashIdx); String nodePrefix = path.substring(lastSlashIdx+1); - List nodes = zk.getChildren(parent, false); + List nodes = getZk().getChildren(parent, false); List matching = filterByPrefix(nodes, nodePrefix); for (String node : matching) { String nodePath = parent + "/" + node; - Stat stat = zk.exists(nodePath, false); + Stat stat = getZk().exists(nodePath, false); if (stat != null) { return nodePath; } } return null; } - + public byte[] removeMetaData(byte[] data) { if(data == null || data.length == 0) { return data; @@ -550,27 +556,36 @@ public class RecoverableZooKeeper { } public long getSessionId() { - return zk.getSessionId(); + return getZk().getSessionId(); } public void close() throws InterruptedException { - zk.close(); + zkManager.close(); } public States getState() { - return zk.getState(); + return getZk().getState(); } public ZooKeeper getZooKeeper() { - return zk; + return getZk(); } public byte[] getSessionPasswd() { - return zk.getSessionPasswd(); + return getZk().getSessionPasswd(); } public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) { - this.zk.sync(path, null, null); + this.getZk().sync(path, null, null); + } + + /** + * Returns the managed CuratorFramework instance. This function ensures that + * before returning the curator instance is initialized + * @return CuratorFramework instance + */ + public CuratorFramework getCuratorClient() { + return zkManager.getCuratorClient(); } /** @@ -582,7 +597,7 @@ public class RecoverableZooKeeper { * @param prefixes the prefixes to include in the result * @return list of every element that starts with one of the prefixes */ - private static List filterByPrefix(List nodes, + private static List filterByPrefix(List nodes, String... prefixes) { List lockChildren = new ArrayList(); for (String child : nodes){ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index b583722..dfa8e50 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -23,21 +23,16 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; -import java.net.InetAddress; import java.net.Socket; import java.util.ArrayList; -import java.util.List; -import java.util.Properties; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Properties; -import javax.security.auth.login.LoginException; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; -import org.apache.hadoop.security.SecurityUtil; -import org.apache.hadoop.security.authentication.util.KerberosUtil; - import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,16 +44,17 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.authentication.util.KerberosUtil; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.server.ZooKeeperSaslServer; /** @@ -90,7 +86,7 @@ public class ZKUtil { * @return connection to zookeeper * @throws IOException if unable to connect to zk or config problem */ - public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher) + public static RecoverableZooKeeper connect(Configuration conf, ZooKeeperWatcher watcher) throws IOException { Properties properties = ZKConfig.makeZKProps(conf); String ensemble = ZKConfig.getZKQuorumServersString(properties); @@ -98,13 +94,13 @@ public class ZKUtil { } public static RecoverableZooKeeper connect(Configuration conf, String ensemble, - Watcher watcher) + ZooKeeperWatcher watcher) throws IOException { return connect(conf, ensemble, watcher, ""); } public static RecoverableZooKeeper connect(Configuration conf, String ensemble, - Watcher watcher, final String descriptor) + ZooKeeperWatcher watcher, final String descriptor) throws IOException { if(ensemble == null) { throw new IOException("Unable to determine ZooKeeper ensemble"); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java new file mode 100644 index 0000000..7e5f1ae --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperManager.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +import com.netflix.curator.ensemble.EnsembleProvider; +import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.framework.CuratorFrameworkFactory; +import com.netflix.curator.retry.ExponentialBackoffRetry; +import com.netflix.curator.utils.ZookeeperFactory; + +/** + * Manages the ZooKeeper connection. + */ +@InterfaceAudience.Private +class ZooKeeperManager { + private static final Log LOG = LogFactory.getLog(ZooKeeperManager.class); + + private volatile CuratorFramework curatorClient; + private ManagedZooKeeperFactory zkFactory; + private Watcher zkWatcher; + + private String connectString; + private int sessionTimeout; + private int maxRetries; + private int retryIntervalMillis; + private boolean canBeReadOnly; + + /** + * Contains a set of watchers and forwards the events to all. + */ + static class WatcherSet implements Watcher { + private Watcher[] watchers; + + public WatcherSet(Watcher zkWatcher, Watcher curatorWatcher) { + this.watchers = new Watcher[] {zkWatcher, curatorWatcher}; + } + + @Override + public void process(WatchedEvent event) { + for (Watcher watcher: watchers) { + LOG.debug("WatcherSet: sending event:" + event + " to watcher: " + watcher); + watcher.process(event); + } + } + + Watcher getCuratorWatcher() { + return watchers[1]; + } + } + + static class ManagedZooKeeper extends ZooKeeper { + public ManagedZooKeeper(String connectString, int sessionTimeout, + Watcher watcher, boolean canBeReadOnly) throws IOException { + super(connectString, sessionTimeout, watcher, canBeReadOnly); + } + @Override + public synchronized void close() throws InterruptedException { + //close is no-op since we do not want curator to close the connection + } + public void shutDown() throws InterruptedException { + super.close(); + } + } + + /** + * In HBase, we want to manage our own ZooKeeper connection, and we do not want curator + * to recreate the connection. This class bridges bridges curator and our zk-management. + */ + static class ManagedZooKeeperFactory implements ZookeeperFactory { + class ManagedEnsembleProvider implements EnsembleProvider { + @Override + public void start() throws Exception { + } + @Override + public String getConnectionString() { + return String.valueOf(zk.getSessionId()); + } + @Override + public void close() throws IOException { + } + } + + ZooKeeperWatcher zkWatcher; + ManagedZooKeeper zk = null; + ManagedZooKeeper lastZk = null; + ManagedEnsembleProvider ensembleProvider; + WatcherSet doubleWatcher; + + ManagedZooKeeperFactory(ZooKeeperWatcher zkWatcher) { + this.zkWatcher = zkWatcher; + this.ensembleProvider = new ManagedEnsembleProvider(); + } + @Override + public synchronized ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher curatorWatcher, + boolean canBeReadOnly) throws Exception { + LOG.debug("Curator requested newZookeeper()"); + if (doubleWatcher != null) { + return zk; + } + // Wait for the new zookeeper to be initialized from HBase + while (this.zk == lastZk || this.zk == null) { + this.wait(); + } + this.doubleWatcher = new WatcherSet(zkWatcher, curatorWatcher); + this.zk.register(doubleWatcher); + + // HACK: send the last connection event to curator so that it initializes + // it's state + WatchedEvent lastEvent = zkWatcher.getLastConnectionEvent(); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending last zookeeper event to curator: " + lastEvent); + } + if (lastEvent != null) { + curatorWatcher.process(lastEvent); + } + + return this.zk; + } + + public synchronized void close() throws InterruptedException { + if (this.zk != null) { + this.zk.shutDown(); + this.zk = null; + } + } + + public synchronized void connect(String connectString, int sessionTimeout, Watcher watcher, + boolean canBeReadOnly) throws IOException { + synchronized (this) { + Watcher w = watcher; + if (this.doubleWatcher != null) { + //assume the watcher from Curator does not change while the CuratorFramework is alive + w = this.doubleWatcher = new WatcherSet(watcher, doubleWatcher.getCuratorWatcher()); + } + this.lastZk = zk; + this.zk = new ManagedZooKeeper(connectString, sessionTimeout, w, canBeReadOnly); + this.notify(); + } + } + } + + public ZooKeeperManager(ZooKeeperWatcher zkWatcher, String connectString, int sessionTimeout, + int maxRetries, int retryIntervalMillis, boolean canBeReadOnly) { + this.zkWatcher = zkWatcher; + this.connectString = connectString; + this.sessionTimeout = sessionTimeout; + this.maxRetries = maxRetries; + this.retryIntervalMillis = retryIntervalMillis; + this.canBeReadOnly = canBeReadOnly; + this.zkFactory = new ManagedZooKeeperFactory(zkWatcher); + } + + /** + * Initializes the curator framework. We call this only when we need it, so that + * curator is not initialized from the client side. + */ + private synchronized void initializeCurator() { + if (this.curatorClient == null) { + this.curatorClient = CuratorFrameworkFactory.builder() + .sessionTimeoutMs(sessionTimeout) + .zookeeperFactory(zkFactory) + .ensembleProvider(zkFactory.ensembleProvider) + .retryPolicy(new ExponentialBackoffRetry(retryIntervalMillis, maxRetries)) + .canBeReadOnly(canBeReadOnly) + .build(); + this.curatorClient.start(); + } + } + + public void connect() throws IOException { + this.zkFactory.connect(connectString, sessionTimeout, zkWatcher, canBeReadOnly); + + } + + public void reconnect() throws IOException, InterruptedException { + zkFactory.close(); //we don't want to close the CuratorClient + connect(); + } + + /** + * Returns the managed CuratorFramework instance. This function ensures that + * before returning the curator instance is initialized + * @return CuratorFramework instance + */ + public CuratorFramework getCuratorClient() { + if (curatorClient == null) { + //note: since curatorClient is volatile, this double checked locking + // is not broken under JDK5+ (http://en.wikipedia.org/wiki/Double-checked_locking) + initializeCurator(); + } + return this.curatorClient; + } + + /** + * Returns the underlying ZooKeeper instance + * @return ZooKeeper object + */ + public ZooKeeper getZooKeeper() { + return this.zkFactory.zk; + } + + public void close() throws InterruptedException { + if (this.curatorClient != null) { + this.curatorClient.close(); + } + zkFactory.close(); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 128a0d9..07d3634 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -104,6 +104,9 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { // znode containing the state of the load balancer public String balancerZNode; + //Keep around the last even about the connection + private WatchedEvent lastConnectionEvent; + // Certain ZooKeeper nodes need to be world-readable public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE = new ArrayList() { { @@ -320,6 +323,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { * @param event */ private void connectionEvent(WatchedEvent event) { + this.lastConnectionEvent = event; switch(event.getState()) { case SyncConnected: // Now, this callback can be invoked before the this.zookeeper is set. @@ -377,10 +381,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { if (this.abortable != null) this.abortable.abort(msg, new KeeperException.SessionExpiredException()); break; - + case ConnectedReadOnly: - break; - + break; + default: throw new IllegalStateException("Received event is not valid."); } @@ -459,7 +463,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public void abort(String why, Throwable e) { this.abortable.abort(why, e); } - + @Override public boolean isAborted() { return this.abortable.isAborted(); @@ -471,4 +475,12 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public String getMasterAddressZNode() { return this.masterAddressZNode; } + + /** + * Returns the last connection event that has been received. + * @return a WatchedEvent that has been received + */ + public WatchedEvent getLastConnectionEvent() { + return lastConnectionEvent; + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java index 99a214d..2452fb1 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java @@ -18,14 +18,22 @@ package org.apache.hadoop.hbase.zookeeper; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -58,7 +66,7 @@ public class TestZKLeaderManager { } private static class MockLeader extends Thread implements Stoppable { - private boolean stopped; + private volatile boolean stopped; private ZooKeeperWatcher watcher; private ZKLeaderManager zkLeader; private AtomicBoolean master = new AtomicBoolean(false); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java new file mode 100644 index 0000000..efa9ecf --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZooKeeperManager.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.lang.reflect.Field; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.framework.imps.CuratorFrameworkState; + +@Category(MediumTests.class) +public class TestZooKeeperManager { + private static final Log LOG = LogFactory.getLog(TestZooKeeperManager.class); + + HBaseTestingUtility util; + + @Before + public void setUp() throws Exception { + util = new HBaseTestingUtility(); + util.startMiniZKCluster(1); + } + + @After + public void tearDown() throws IOException { + util.shutdownMiniZKCluster(); + } + + /** Returns the value of a private field */ + @SuppressWarnings("unchecked") + T getField(O obj, String fieldName) throws SecurityException, NoSuchFieldException, + IllegalArgumentException, IllegalAccessException { + Class clazz = obj.getClass(); + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + return (T) field.get(obj); + } + + @Test + public void testCuratorIsNotInitializedIfNotUsed() throws Exception { + ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher(); + //do a simple operation + zkWatcher.getRecoverableZooKeeper().exists(zkWatcher.baseZNode, false); + + RecoverableZooKeeper zk = zkWatcher.getRecoverableZooKeeper(); + + ZooKeeperManager manager = getField(zk, "zkManager"); + CuratorFramework curatorClient = getField(manager, "curatorClient"); + Assert.assertNull(curatorClient); + } + + @Test(timeout=20000) + public void testCuratorInitializationRightAfterZkWatcher() throws Exception { + ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher(); + CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient(); + Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState()); + Assert.assertNotNull(curatorClient.checkExists().forPath("/")); + } + + @Test(timeout=20000) + public void testCuratorInitializationThenSessionExpiration() throws Exception { + ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher(); + Assert.assertNotNull(zkWatcher.getRecoverableZooKeeper().exists("/", false)); + CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient(); + Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState()); + curatorClient.checkExists().forPath("/"); + + util.expireSession(zkWatcher); + Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState()); + zkWatcher.reconnectAfterExpiration(); + Assert.assertNotNull(curatorClient.checkExists().forPath("/")); + } + + @Test(timeout=20000) + public void testCuratorInitializationAfterSessionExpiration() throws Exception { + ZooKeeperWatcher zkWatcher = util.getZooKeeperWatcher(); + Assert.assertNotNull(zkWatcher.getRecoverableZooKeeper().exists("/", false)); + util.expireSession(zkWatcher); + + CuratorFramework curatorClient = zkWatcher.getRecoverableZooKeeper().getCuratorClient(); + Assert.assertEquals(CuratorFrameworkState.STARTED, curatorClient.getState()); + zkWatcher.reconnectAfterExpiration(); + Assert.assertNotNull(curatorClient.checkExists().forPath("/")); + } +} diff --git pom.xml pom.xml index 3e30ff8..f4cb1c5 100644 --- pom.xml +++ pom.xml @@ -854,6 +854,7 @@ 1.9.0 2.4.1 1.0.1 + 1.2.5 0.9.0 3.4.5 0.0.1-SNAPSHOT @@ -1219,6 +1220,11 @@ stax-api ${stax-api.version}
+ + com.netflix.curator + curator-recipes + ${curator.version} + junit