diff --git a/jdbc/pom.xml b/jdbc/pom.xml index 9a569d1..385d6cc 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -97,6 +97,11 @@ + + org.apache.curator + curator-framework + ${curator.version} + diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java index d53cde7..3ed933a 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -102,6 +102,7 @@ // Non-configurable params: // ZOOKEEPER_SESSION_TIMEOUT is not exposed as client configurable static final int ZOOKEEPER_SESSION_TIMEOUT = 600 * 1000; + static final int ZOOKEEPER_CONNECTION_TIMEOUT = -1; // Currently supports JKS keystore format static final String SSL_TRUST_STORE_TYPE = "JKS"; diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java index 3e4f4e5..d515ce5 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java +++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java @@ -25,9 +25,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; public class ZooKeeperHiveClientHelper { public static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName()); @@ -59,14 +61,14 @@ static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams) List serverHosts; Random randomizer = new Random(); String serverNode; - ZooKeeper zooKeeperClient = null; - // Pick a random HiveServer2 host from the ZooKeeper namspace + CuratorFramework zooKeeperClient = + CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) + .sessionTimeoutMs(JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT) + .connectionTimeoutMs(JdbcConnectionParams.ZOOKEEPER_CONNECTION_TIMEOUT) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zooKeeperClient.start(); try { - zooKeeperClient = - new ZooKeeper(zooKeeperEnsemble, JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT, - new ZooKeeperHiveClientHelper.DummyWatcher()); - // All the HiveServer2 host nodes that are in ZooKeeper currently - serverHosts = zooKeeperClient.getChildren("/" + zooKeeperNamespace, false); + serverHosts = zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace); // Remove the znodes we've already tried from this list serverHosts.removeAll(connParams.getRejectedHostZnodePaths()); if (serverHosts.isEmpty()) { @@ -76,22 +78,18 @@ static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams) // Now pick a host randomly serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size())); connParams.setCurrentHostZnodePath(serverNode); - // Read the value from the node (UTF-8 enoded byte array) and convert it to a String String serverUri = - new String(zooKeeperClient.getData("/" + zooKeeperNamespace + "/" + serverNode, false, - null), Charset.forName("UTF-8")); + new String( + zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode), + Charset.forName("UTF-8")); LOG.info("Selected HiveServer2 instance with uri: " + serverUri); return serverUri; } catch (Exception e) { throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper", e); } finally { - // Try to close the client connection with ZooKeeper + // Close the client connection with ZooKeeper if (zooKeeperClient != null) { - try { - zooKeeperClient.close(); - } catch (Exception e) { - // No-op - } + zooKeeperClient.close(); } } } diff --git a/pom.xml b/pom.xml index 69f4413..c829524 100644 --- a/pom.xml +++ b/pom.xml @@ -161,7 +161,7 @@ 3.4.5 1.1 2.4.0 - 2.5.0 + 2.6.0 @@ -473,13 +473,12 @@ - - org.apache.curator - curator-framework - ${curator.version} - - - + + org.apache.curator + curator-framework + ${curator.version} + + org.codehaus.groovy groovy-all ${groovy.version} diff --git a/service/pom.xml b/service/pom.xml index 9d47a69..b9d3a40 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -86,6 +86,11 @@ libthrift ${libthrift.version} + + org.apache.curator + curator-framework + ${curator.version} + org.apache.hive diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 0aab3f9..03c11e1 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -32,6 +32,9 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; import org.apache.hadoop.hive.conf.HiveConf; @@ -66,7 +69,7 @@ private CLIService cliService; private ThriftCLIService thriftCLIService; private String znodePath; - private ZooKeeper zooKeeperClient; + private CuratorFramework zooKeeperClient; private boolean registeredWithZooKeeper = false; public HiveServer2() { @@ -117,6 +120,7 @@ public static boolean isHTTPTransportMode(HiveConf hiveConf) { private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { int zooKeeperSessionTimeout = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + int connectTimeoutMillis = -1; String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); String instanceURI = getServerInstanceURI(hiveConf); @@ -126,14 +130,16 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { setUpAuthAndAcls(hiveConf, nodeAcls); // Create a ZooKeeper client zooKeeperClient = - new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout, - new ZooKeeperHiveHelper.DummyWatcher()); + CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) + .sessionTimeoutMs(zooKeeperSessionTimeout).connectionTimeoutMs(connectTimeoutMillis) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zooKeeperClient.start(); // Create the parent znodes recursively; ignore if the parent already exists. // If pre-creating the parent on a kerberized cluster, ensure that you give ACLs, // as explained in {@link #setUpAuthAndAcls(HiveConf, List) setUpAuthAndAcls} try { - ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, nodeAcls, - CreateMode.PERSISTENT); + zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .withACL(nodeAcls).forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2"); } catch (KeeperException e) { if (e.code() != KeeperException.Code.NODEEXISTS) { @@ -149,18 +155,19 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";" + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence="; znodePath = - zooKeeperClient.create(pathPrefix, znodeDataUTF8, nodeAcls, - CreateMode.EPHEMERAL_SEQUENTIAL); + zooKeeperClient.create().creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(nodeAcls) + .forPath(pathPrefix, znodeDataUTF8); setRegisteredWithZooKeeper(true); // Set a watch on the znode - if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) { + if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) { // No node exists, throw exception throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper."); } LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI); } catch (KeeperException e) { LOG.fatal("Unable to create a znode for this server instance", e); - throw new Exception(e); + throw (e); } } @@ -337,22 +344,27 @@ static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exce HiveConf hiveConf = new HiveConf(); int zooKeeperSessionTimeout = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + int connectTimeoutMillis = -1; String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); - ZooKeeper zooKeeperClient = - new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout, - new ZooKeeperHiveHelper.DummyWatcher()); - // Get all znode paths + CuratorFramework zooKeeperClient = + CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) + .sessionTimeoutMs(zooKeeperSessionTimeout).connectionTimeoutMs(connectTimeoutMillis) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zooKeeperClient.start(); List znodePaths = - zooKeeperClient.getChildren(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace, - false); + zooKeeperClient.getChildren().forPath( + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); // Now for each path that is for the given versionNumber, delete the znode from ZooKeeper for (String znodePath : znodePaths) { if (znodePath.contains("version=" + versionNumber + ";")) { - zooKeeperClient.delete(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace - + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath, -1); + LOG.info("Removing the znode: " + znodePath + " from ZooKeeper"); + zooKeeperClient.delete().forPath( + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace + + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath); } } + zooKeeperClient.close(); } public static void main(String[] args) {