diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 9a569d1..385d6cc 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -97,6 +97,11 @@
+
+ org.apache.curator
+ curator-framework
+ ${curator.version}
+
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index d53cde7..3ed933a 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -102,6 +102,7 @@
// Non-configurable params:
// ZOOKEEPER_SESSION_TIMEOUT is not exposed as client configurable
static final int ZOOKEEPER_SESSION_TIMEOUT = 600 * 1000;
+ static final int ZOOKEEPER_CONNECTION_TIMEOUT = -1;
// Currently supports JKS keystore format
static final String SSL_TRUST_STORE_TYPE = "JKS";
diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
index 3e4f4e5..d515ce5 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
@@ -25,9 +25,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
public class ZooKeeperHiveClientHelper {
public static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName());
@@ -59,14 +61,14 @@ static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams)
List serverHosts;
Random randomizer = new Random();
String serverNode;
- ZooKeeper zooKeeperClient = null;
- // Pick a random HiveServer2 host from the ZooKeeper namspace
+ CuratorFramework zooKeeperClient =
+ CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+ .sessionTimeoutMs(JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT)
+ .connectionTimeoutMs(JdbcConnectionParams.ZOOKEEPER_CONNECTION_TIMEOUT)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ zooKeeperClient.start();
try {
- zooKeeperClient =
- new ZooKeeper(zooKeeperEnsemble, JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT,
- new ZooKeeperHiveClientHelper.DummyWatcher());
- // All the HiveServer2 host nodes that are in ZooKeeper currently
- serverHosts = zooKeeperClient.getChildren("/" + zooKeeperNamespace, false);
+ serverHosts = zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace);
// Remove the znodes we've already tried from this list
serverHosts.removeAll(connParams.getRejectedHostZnodePaths());
if (serverHosts.isEmpty()) {
@@ -76,22 +78,18 @@ static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams)
// Now pick a host randomly
serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size()));
connParams.setCurrentHostZnodePath(serverNode);
- // Read the value from the node (UTF-8 enoded byte array) and convert it to a String
String serverUri =
- new String(zooKeeperClient.getData("/" + zooKeeperNamespace + "/" + serverNode, false,
- null), Charset.forName("UTF-8"));
+ new String(
+ zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode),
+ Charset.forName("UTF-8"));
LOG.info("Selected HiveServer2 instance with uri: " + serverUri);
return serverUri;
} catch (Exception e) {
throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper", e);
} finally {
- // Try to close the client connection with ZooKeeper
+ // Close the client connection with ZooKeeper
if (zooKeeperClient != null) {
- try {
- zooKeeperClient.close();
- } catch (Exception e) {
- // No-op
- }
+ zooKeeperClient.close();
}
}
}
diff --git a/pom.xml b/pom.xml
index 69f4413..c829524 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,7 +161,7 @@
3.4.5
1.1
2.4.0
- 2.5.0
+ 2.6.0
@@ -473,13 +473,12 @@
-
- org.apache.curator
- curator-framework
- ${curator.version}
-
-
-
+
+ org.apache.curator
+ curator-framework
+ ${curator.version}
+
+
org.codehaus.groovy
groovy-all
${groovy.version}
diff --git a/service/pom.xml b/service/pom.xml
index 9d47a69..b9d3a40 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -86,6 +86,11 @@
libthrift
${libthrift.version}
+
+ org.apache.curator
+ curator-framework
+ ${curator.version}
+
org.apache.hive
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 0aab3f9..03c11e1 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -32,6 +32,9 @@
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -66,7 +69,7 @@
private CLIService cliService;
private ThriftCLIService thriftCLIService;
private String znodePath;
- private ZooKeeper zooKeeperClient;
+ private CuratorFramework zooKeeperClient;
private boolean registeredWithZooKeeper = false;
public HiveServer2() {
@@ -117,6 +120,7 @@ public static boolean isHTTPTransportMode(HiveConf hiveConf) {
private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
int zooKeeperSessionTimeout =
hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+ int connectTimeoutMillis = -1;
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
String instanceURI = getServerInstanceURI(hiveConf);
@@ -126,14 +130,16 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
setUpAuthAndAcls(hiveConf, nodeAcls);
// Create a ZooKeeper client
zooKeeperClient =
- new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
- new ZooKeeperHiveHelper.DummyWatcher());
+ CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+ .sessionTimeoutMs(zooKeeperSessionTimeout).connectionTimeoutMs(connectTimeoutMillis)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ zooKeeperClient.start();
// Create the parent znodes recursively; ignore if the parent already exists.
// If pre-creating the parent on a kerberized cluster, ensure that you give ACLs,
// as explained in {@link #setUpAuthAndAcls(HiveConf, List) setUpAuthAndAcls}
try {
- ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, nodeAcls,
- CreateMode.PERSISTENT);
+ zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+ .withACL(nodeAcls).forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
} catch (KeeperException e) {
if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -149,18 +155,19 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
+ "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
znodePath =
- zooKeeperClient.create(pathPrefix, znodeDataUTF8, nodeAcls,
- CreateMode.EPHEMERAL_SEQUENTIAL);
+ zooKeeperClient.create().creatingParentsIfNeeded()
+ .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(nodeAcls)
+ .forPath(pathPrefix, znodeDataUTF8);
setRegisteredWithZooKeeper(true);
// Set a watch on the znode
- if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) {
+ if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
// No node exists, throw exception
throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
}
LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
} catch (KeeperException e) {
LOG.fatal("Unable to create a znode for this server instance", e);
- throw new Exception(e);
+ throw (e);
}
}
@@ -337,22 +344,27 @@ static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exce
HiveConf hiveConf = new HiveConf();
int zooKeeperSessionTimeout =
hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+ int connectTimeoutMillis = -1;
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
- ZooKeeper zooKeeperClient =
- new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
- new ZooKeeperHiveHelper.DummyWatcher());
- // Get all znode paths
+ CuratorFramework zooKeeperClient =
+ CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
+ .sessionTimeoutMs(zooKeeperSessionTimeout).connectionTimeoutMs(connectTimeoutMillis)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ zooKeeperClient.start();
List znodePaths =
- zooKeeperClient.getChildren(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace,
- false);
+ zooKeeperClient.getChildren().forPath(
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
// Now for each path that is for the given versionNumber, delete the znode from ZooKeeper
for (String znodePath : znodePaths) {
if (znodePath.contains("version=" + versionNumber + ";")) {
- zooKeeperClient.delete(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
- + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath, -1);
+ LOG.info("Removing the znode: " + znodePath + " from ZooKeeper");
+ zooKeeperClient.delete().forPath(
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath);
}
}
+ zooKeeperClient.close();
}
public static void main(String[] args) {