diff --git a/jdbc/pom.xml b/jdbc/pom.xml index 9a569d1..385d6cc 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -97,6 +97,11 @@ + + org.apache.curator + curator-framework + ${curator.version} + diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java index d53cde7..3ed933a 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -102,6 +102,7 @@ // Non-configurable params: // ZOOKEEPER_SESSION_TIMEOUT is not exposed as client configurable static final int ZOOKEEPER_SESSION_TIMEOUT = 600 * 1000; + static final int ZOOKEEPER_CONNECTION_TIMEOUT = -1; // Currently supports JKS keystore format static final String SSL_TRUST_STORE_TYPE = "JKS"; diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java index 3e4f4e5..d515ce5 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java +++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java @@ -25,9 +25,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; public class ZooKeeperHiveClientHelper { public static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName()); @@ -59,14 +61,14 @@ static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams) List serverHosts; Random randomizer = new Random(); String serverNode; - ZooKeeper zooKeeperClient = null; - // Pick a random HiveServer2 host from the ZooKeeper namspace + CuratorFramework zooKeeperClient = + CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) + .sessionTimeoutMs(JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT) + .connectionTimeoutMs(JdbcConnectionParams.ZOOKEEPER_CONNECTION_TIMEOUT) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zooKeeperClient.start(); try { - zooKeeperClient = - new ZooKeeper(zooKeeperEnsemble, JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT, - new ZooKeeperHiveClientHelper.DummyWatcher()); - // All the HiveServer2 host nodes that are in ZooKeeper currently - serverHosts = zooKeeperClient.getChildren("/" + zooKeeperNamespace, false); + serverHosts = zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace); // Remove the znodes we've already tried from this list serverHosts.removeAll(connParams.getRejectedHostZnodePaths()); if (serverHosts.isEmpty()) { @@ -76,22 +78,18 @@ static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams) // Now pick a host randomly serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size())); connParams.setCurrentHostZnodePath(serverNode); - // Read the value from the node (UTF-8 enoded byte array) and convert it to a String String serverUri = - new String(zooKeeperClient.getData("/" + zooKeeperNamespace + "/" + serverNode, false, - null), Charset.forName("UTF-8")); + new String( + zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode), + Charset.forName("UTF-8")); LOG.info("Selected HiveServer2 instance with uri: " + serverUri); return serverUri; } catch (Exception e) { throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper", e); } finally { - // Try to close the client connection with ZooKeeper + // Close the client connection with ZooKeeper if (zooKeeperClient != null) { - try { - zooKeeperClient.close(); - } catch (Exception e) { - // No-op - } + zooKeeperClient.close(); } } } diff --git a/pom.xml b/pom.xml index 424a079..bd74830 100644 --- a/pom.xml +++ b/pom.xml @@ -473,13 +473,12 @@ - - org.apache.curator - curator-framework - ${curator.version} - - - + + org.apache.curator + curator-framework + ${curator.version} + + org.codehaus.groovy groovy-all ${groovy.version} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java index 29d0531..f6cb8ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java @@ -18,17 +18,10 @@ package org.apache.hadoop.hive.ql.util; -import java.util.List; - -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.ACL; public class ZooKeeperHiveHelper { public static final Log LOG = LogFactory.getLog(ZooKeeperHiveHelper.class.getName()); @@ -59,34 +52,6 @@ public static String getQuorumServers(HiveConf conf) { return quorum.toString(); } - - /** - * Create a path on ZooKeeper, if it does not already exist ("mkdir -p") - * - * @param zooKeeperClient ZooKeeper session - * @param path string with ZOOKEEPER_PATH_SEPARATOR as the separator - * @param acl list of ACL entries - * @param createMode for create mode of each node in the patch - * @return - * @throws KeeperException - * @throws InterruptedException - */ - public static String createPathRecursively(ZooKeeper zooKeeperClient, String path, List acl, - CreateMode createMode) throws KeeperException, InterruptedException { - String[] pathComponents = StringUtils.splitByWholeSeparator(path, ZOOKEEPER_PATH_SEPARATOR); - String currentPath = ""; - for (String pathComponent : pathComponents) { - currentPath += ZOOKEEPER_PATH_SEPARATOR + pathComponent; - try { - String node = zooKeeperClient.create(currentPath, new byte[0], acl, createMode); - LOG.info("Created path: " + node); - } catch (KeeperException.NodeExistsException e) { - // Do nothing here - } - } - return currentPath; - } - /** * A no-op watcher class */ @@ -95,5 +60,4 @@ public static String createPathRecursively(ZooKeeper zooKeeperClient, String pat public void process(org.apache.zookeeper.WatchedEvent event) { } } - } diff --git a/service/pom.xml b/service/pom.xml index 9d47a69..b9d3a40 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -86,6 +86,11 @@ libthrift ${libthrift.version} + + org.apache.curator + curator-framework + ${curator.version} + org.apache.hive diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 3c37f24..b814e4b 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -32,6 +32,10 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; import org.apache.hadoop.hive.conf.HiveConf; @@ -52,7 +56,6 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Perms; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; /** @@ -65,7 +68,7 @@ private CLIService cliService; private ThriftCLIService thriftCLIService; private String znodePath; - private ZooKeeper zooKeeperClient; + private CuratorFramework zooKeeperClient; private boolean registeredWithZooKeeper = false; public HiveServer2() { @@ -73,7 +76,6 @@ public HiveServer2() { HiveConf.setLoadHiveServer2Config(true); } - @Override public synchronized void init(HiveConf hiveConf) { cliService = new CLIService(this); @@ -108,6 +110,33 @@ public static boolean isHTTPTransportMode(HiveConf hiveConf) { } /** + * ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory + */ + private final ACLProvider zooKeeperAclProvider = new ACLProvider() { + List nodeAcls = new ArrayList(); + + @Override + public List getDefaultAcl() { + if (ShimLoader.getHadoopShims().isSecurityEnabled()) { + // Read all to the world + nodeAcls.addAll(Ids.READ_ACL_UNSAFE); + // Create/Delete/Write/Admin to the authenticated user + nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS)); + } else { + // ACLs for znodes on a non-kerberized cluster + // Create/Read/Delete/Write/Admin to the world + nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE); + } + return nodeAcls; + } + + @Override + public List getAclForPath(String path) { + return getDefaultAcl(); + } + }; + + /** * Adds a server instance to ZooKeeper as a znode. * * @param hiveConf @@ -116,28 +145,29 @@ public static boolean isHTTPTransportMode(HiveConf hiveConf) { private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { int zooKeeperSessionTimeout = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + int connectTimeoutMillis = -1; String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); String instanceURI = getServerInstanceURI(hiveConf); byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8")); - // Znode ACLs - List nodeAcls = new ArrayList(); - setUpAuthAndAcls(hiveConf, nodeAcls); - // Create a ZooKeeper client + setUpZooKeeperAuth(hiveConf); + // Create a CuratorFramework instance to be used as the ZooKeeper client + // Use the zooKeeperAclProvider to create appropriate ACLs zooKeeperClient = - new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout, - new ZooKeeperHiveHelper.DummyWatcher()); + CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) + .sessionTimeoutMs(zooKeeperSessionTimeout).connectionTimeoutMs(connectTimeoutMillis) + .aclProvider(zooKeeperAclProvider).retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .build(); + zooKeeperClient.start(); // Create the parent znodes recursively; ignore if the parent already exists. - // If pre-creating the parent on a kerberized cluster, ensure that you give ACLs, - // as explained in {@link #setUpAuthAndAcls(HiveConf, List) setUpAuthAndAcls} try { - ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, nodeAcls, - CreateMode.PERSISTENT); + zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2"); } catch (KeeperException e) { if (e.code() != KeeperException.Code.NODEEXISTS) { LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e); - throw (e); + throw e; } } // Create a znode under the rootNamespace parent for this instance of the server @@ -148,56 +178,40 @@ private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";" + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence="; znodePath = - zooKeeperClient.create(pathPrefix, znodeDataUTF8, nodeAcls, - CreateMode.EPHEMERAL_SEQUENTIAL); + zooKeeperClient.create().creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPrefix, znodeDataUTF8); setRegisteredWithZooKeeper(true); // Set a watch on the znode - if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) { + if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) { // No node exists, throw exception throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper."); } LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI); } catch (KeeperException e) { LOG.fatal("Unable to create a znode for this server instance", e); - throw new Exception(e); + throw (e); } } /** - * Set up ACLs for znodes based on whether the cluster is secure or not. - * On a kerberized cluster, ZooKeeper performs Kerberos-SASL authentication. - * We give Read privilege to the world, but Create/Delete/Write/Admin to the authenticated user. - * On a non-kerberized cluster, we give Create/Read/Delete/Write/Admin privileges to the world. + * For a kerberized cluster, we dynamically set up the client's JAAS conf. * - * For a kerberized cluster, we also dynamically set up the client's JAAS conf. * @param hiveConf - * @param nodeAcls * @return * @throws Exception */ - private void setUpAuthAndAcls(HiveConf hiveConf, List nodeAcls) throws Exception { + private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception { if (ShimLoader.getHadoopShims().isSecurityEnabled()) { String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL); if (principal.isEmpty()) { - throw new IOException( - "HiveServer2 Kerberos principal is empty"); + throw new IOException("HiveServer2 Kerberos principal is empty"); } String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB); if (keyTabFile.isEmpty()) { - throw new IOException( - "HiveServer2 Kerberos keytab is empty"); + throw new IOException("HiveServer2 Kerberos keytab is empty"); } - // Install the JAAS Configuration for the runtime ShimLoader.getHadoopShims().setZookeeperClientKerberosJaasConfig(principal, keyTabFile); - // Read all to the world - nodeAcls.addAll(Ids.READ_ACL_UNSAFE); - // Create/Delete/Write/Admin to the authenticated user - nodeAcls.add(new ACL(Perms.ALL, Ids.AUTH_IDS)); - } else { - // ACLs for znodes on a non-kerberized cluster - // Create/Read/Delete/Write/Admin to the world - nodeAcls.addAll(Ids.OPEN_ACL_UNSAFE); } } @@ -333,22 +347,27 @@ static void deleteServerInstancesFromZooKeeper(String versionNumber) throws Exce HiveConf hiveConf = new HiveConf(); int zooKeeperSessionTimeout = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + int connectTimeoutMillis = -1; String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); - ZooKeeper zooKeeperClient = - new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout, - new ZooKeeperHiveHelper.DummyWatcher()); - // Get all znode paths + CuratorFramework zooKeeperClient = + CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) + .sessionTimeoutMs(zooKeeperSessionTimeout).connectionTimeoutMs(connectTimeoutMillis) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zooKeeperClient.start(); List znodePaths = - zooKeeperClient.getChildren(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace, - false); + zooKeeperClient.getChildren().forPath( + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); // Now for each path that is for the given versionNumber, delete the znode from ZooKeeper for (String znodePath : znodePaths) { if (znodePath.contains("version=" + versionNumber + ";")) { - zooKeeperClient.delete(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace - + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath, -1); + LOG.info("Removing the znode: " + znodePath + " from ZooKeeper"); + zooKeeperClient.delete().forPath( + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace + + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath); } } + zooKeeperClient.close(); } public static void main(String[] args) { @@ -503,8 +522,8 @@ public void execute() { } /** - * DeregisterOptionExecutor: executes the --deregister option by - * deregistering all HiveServer2 instances from ZooKeeper of a specific version. + * DeregisterOptionExecutor: executes the --deregister option by deregistering all HiveServer2 + * instances from ZooKeeper of a specific version. */ static class DeregisterOptionExecutor implements ServerOptionsExecutor { private final String versionNumber; @@ -526,4 +545,3 @@ public void execute() { } } } -