diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7f4afd9..089b2ee 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1224,10 +1224,14 @@ "This param is to control whether or not only do lock on queries\n" + "that need to execute at least one mapred job."), + // Zookeeper related configs HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", "", - "The list of ZooKeeper servers to talk to. This is only needed for read/write locks."), + "List of ZooKeeper servers (should be called ensemble instead of quorum) to talk to. This is needed for: " + + "1. Read/write locks," + + "2. When HiveServer2 supports service discovery via Zookeeper."), HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", "2181", - "The port of ZooKeeper servers to talk to. This is only needed for read/write locks."), + "The port of ZooKeeper servers to talk to. If the list of Zookeeper servers specified in HIVE_ZOOKEEPER_QUORUM," + + "does not contain port numbers, this value is used."), HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000, "ZooKeeper client's session timeout. The client is disconnected, and as a result, all locks released, \n" + "if a heartbeat is not sent in the timeout."), @@ -1417,11 +1421,6 @@ "If the property is set, the value must be a valid URI (java.net.URI, e.g. \"file:///tmp/my-logging.properties\"), \n" + "which you can then extract a URL from and pass to PropertyConfigurator.configure(URL)."), - // Hive global init file location - HIVE_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}", - "The location of HS2 global init file (.hiverc).\n" + - "If the property is reset, the value must be a valid path where the init file is located."), - // prefix used to auto generated column aliases (this should be started with '_') HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL("hive.autogen.columnalias.prefix.label", "_c", "String used as a prefix when auto generating column alias.\n" + @@ -1460,13 +1459,25 @@ "table. From 0.12 onwards, they are displayed separately. This flag will let you\n" + "get old behavior, if desired. See, test-case in patch for HIVE-6689."), + // HiveServer2 specific configs HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, new RangeValidator(0L, Long.MAX_VALUE), "This number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds between retries. \n" + "The default of 30 will keep trying for 30 minutes."), - + HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY("hive.server2.support.dynamic.service.discovery", false, + "Whether HiveServer2 supports dynamic service discovery for its clients. " + + "To support this, each instance of HiveServer2 currently uses ZooKeeper to register itself when it is brought up." + + "Correspondingly, JDBC/ODBC clients should use the ZooKeeper ensemble (HIVE_ZOOKEEPER_QUORUM) in their connection string."), + HIVE_SERVER2_ZOOKEEPER_NAMESPACE("hive.server2.zookeeper.namespace", "zk_hiveserver2", + "The parent node in ZooKeeper used by HiveServer2 when supporting dynamic service discovery."), + // HiveServer2 global init file location + HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}", + "The location of HS2 global init file (.hiverc).\n" + + "If the property is reset, the value must be a valid path where the init file is located."), HIVE_SERVER2_TRANSPORT_MODE("hive.server2.transport.mode", "binary", new StringSet("binary", "http"), "Server transport mode. \"binary\" or \"http\""), - + HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "", + "Bind host on which to run the HiveServer2 Thrift interface.\n" + + "Can be overridden by setting $HIVE_SERVER2_THRIFT_BIND_HOST"), // http (over thrift) transport settings HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001, "Port number when in HTTP mode."), @@ -1476,16 +1487,12 @@ "Minimum number of worker threads when in HTTP mode."), HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS("hive.server2.thrift.http.max.worker.threads", 500, "Maximum number of worker threads when in HTTP mode."), - HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", 1800000, + HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", 1800000, "Maximum idle time in milliseconds for a connection on the server when in HTTP mode."), - // binary transport settings HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000, "Port number of HiveServer2 Thrift interface.\n" + "Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT"), - HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "", - "Bind host on which to run the HiveServer2 Thrift interface.\n" + - "Can be overridden by setting $HIVE_SERVER2_THRIFT_BIND_HOST"), // hadoop.rpc.protection being set to a higher level than HiveServer2 // does not make sense in most situations. // HiveServer2 ignores hadoop.rpc.protection in favor of hive.server2.thrift.sasl.qop. @@ -1500,7 +1507,6 @@ "Minimum number of Thrift worker threads"), HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 500, "Maximum number of Thrift worker threads"), - // Configuration for async thread pool in SessionManager HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100, "Number of threads in the async thread pool for HiveServer2"), @@ -1515,7 +1521,6 @@ HIVE_SERVER2_LONG_POLLING_TIMEOUT("hive.server2.long.polling.timeout", 5000L, "Time in milliseconds that HiveServer2 will wait,\n" + "before responding to asynchronous calls that use long polling"), - // HiveServer2 auth configuration HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE", new StringSet("NOSASL", "NONE", "LDAP", "KERBEROS", "PAM", "CUSTOM"), @@ -1562,7 +1567,6 @@ HIVE_SERVER2_PAM_SERVICES("hive.server2.authentication.pam.services", null, "List of the underlying pam services that should be used when auth type is PAM\n" + "A file with the same name must exist in /etc/pam.d"), - HIVE_SERVER2_ENABLE_DOAS("hive.server2.enable.doAs", true, "Setting this property to true will have HiveServer2 execute\n" + "Hive operations as the user making the calls to it."), diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index cbcfec7..93675bb 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -20,6 +20,7 @@ import java.io.FileInputStream; import java.io.IOException; +import java.nio.charset.Charset; import java.security.KeyStore; import java.sql.Array; import java.sql.Blob; @@ -44,6 +45,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.Random; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -79,6 +81,7 @@ import org.apache.thrift.transport.THttpClient; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.ZooKeeper; /** * HiveConnection. @@ -86,6 +89,7 @@ */ public class HiveConnection implements java.sql.Connection { public static final Log LOG = LogFactory.getLog(HiveConnection.class.getName()); + private static final String HIVE_AUTH_TYPE= "auth"; private static final String HIVE_AUTH_QOP = "sasl.qop"; private static final String HIVE_AUTH_SIMPLE = "noSasl"; @@ -102,10 +106,15 @@ private static final String HIVE_SSL_TRUST_STORE_PASSWORD = "trustStorePassword"; private static final String HIVE_SERVER2_TRANSPORT_MODE = "hive.server2.transport.mode"; private static final String HIVE_SERVER2_THRIFT_HTTP_PATH = "hive.server2.thrift.http.path"; + private static final String HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY = + "hive.server2.support.dynamic.service.discovery"; + private static final String HIVE_SERVER2_ZOOKEEPER_NAMESPACE = + "hive.server2.zookeeper.namespace"; + private static final int HIVE_SERVER2_ZOOKEEPER_SESSION_TIMEOUT = 600 * 1000; private static final String HIVE_VAR_PREFIX = "hivevar:"; private static final String HIVE_CONF_PREFIX = "hiveconf:"; // Currently supports JKS keystore format - // See HIVE-6286 (Add support for PKCS12 keystore format) + // TODO See HIVE-6286 (Add support for PKCS12 keystore format) private static final String HIVE_SSL_TRUST_STORE_TYPE = "JKS"; private final String jdbcURI; @@ -115,8 +124,9 @@ private final Map hiveConfMap; private final Map hiveVarMap; private final boolean isEmbeddedMode; + // TODO should be replaced by CliServiceClient private TTransport transport; - private TCLIService.Iface client; // todo should be replaced by CliServiceClient + private TCLIService.Iface client; private boolean isClosed = true; private SQLWarning warningChain = null; private TSessionHandle sessHandle = null; @@ -126,6 +136,10 @@ public HiveConnection(String uri, Properties info) throws SQLException { setupLoginTimeout(); + // Resolve the supplied uri to point to a specific HiveServer2 host:port + // When HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY is true, the supplied uri contains + // a ZooKeeper ensemble instead of an HiveServer2 host:port. + uri = resolveURI(uri); jdbcURI = uri; // parse the connection uri Utils.JdbcConnectionParams connParams; @@ -192,6 +206,129 @@ public HiveConnection(String uri, Properties info) throws SQLException { openSession(connParams); } + /** + * Resolves all URI indirections to return a uri with specific host:port info. Specifically, if + * HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY is true, picks a host randomly from the list of + * HiveServer2 hosts registered with ZooKeeper. + * @param uri + * @return + * @throws SQLException + */ + private String resolveURI(String uri) throws SQLException { + if (useDynamicServiceDiscovery(uri)) { + // Extract the ZooKeeper ensemble from the uri + String zooKeeperEnsemble = extractZooKeeperEnsemble(uri); + String serverZooKeeperNamespace = extractZooKeeperNamespace(uri); + List serverHosts; + Random randomizer = new Random(); + String serverNode; + // Pick a random HiveServer2 host from the ZooKeeper namspace + try { + ZooKeeper zooKeeperClient = new ZooKeeper(zooKeeperEnsemble, + HIVE_SERVER2_ZOOKEEPER_SESSION_TIMEOUT, new ZooKeeperHiveClientHelper.DummyWatcher()); + // All the HiveServer2 host nodes + serverHosts = zooKeeperClient.getChildren("/" + serverZooKeeperNamespace, false); + serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size())); + // Read the value from the node (UTF-8 enoded byte array) and convert it to a String + String serverURI = new String(zooKeeperClient.getData("/" + serverZooKeeperNamespace + "/" + + serverNode, false, null), Charset.forName("UTF-8")); + LOG.info("Selected HiveServer2 instance with uri: " + serverURI); + // Now return a JDBC uri of the format: jdbc:hive2://hiveserver2-host:port/..., + // where hiveserver2-host:port is the serverURI + return uri.replace(zooKeeperEnsemble, serverURI); + } catch (Exception e) { + throw new SQLException("Unable to resolve JDBC uri: " + uri, e); + } + } + else { + return uri; + } + } + + private boolean useDynamicServiceDiscovery(String uri) { + if (uri.contains(HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + String paramValue = getParamValueFromURI(uri, + HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY); + return "true".equalsIgnoreCase(paramValue); + } + return false; + } + + /** + * Extract the ZooKeeper ensemble that the client connects to. + * + * @param uri + * @return + * @throws SQLException + */ + private String extractZooKeeperEnsemble(String uri) throws SQLException { + // JDBC uri should start with "jdbc://hive2" + if (!uri.startsWith(Utils.URL_PREFIX)) { + throw new SQLException("Bad URL format: Missing prefix " + Utils.URL_PREFIX); + } + if (uri.equalsIgnoreCase(Utils.URL_PREFIX)) { + throw new SQLException("Embedded JDBC mode does not support dynamic service discovery"); + } + // For a jdbc uri like: jdbc:hive2://host1:port1,host2:port2,host3:port3/ + // Extract the ZooKeeper ensemble starting after "jdbc:hive2://", till the 1st "/" + int fromIndex = Utils.URL_PREFIX.length(); + int toIndex = uri.indexOf("/", fromIndex); + if (toIndex < 0) { + throw new SQLException("Bad URL format: Missing a '/' after the hostname."); + } + String zooKeeperEnsemble = uri.substring(fromIndex, uri.indexOf("/", fromIndex)); + if ((zooKeeperEnsemble == null) || zooKeeperEnsemble.isEmpty()) { + throw new SQLException("Please specify the list of ZooKeeper hosts to connect to, " + + "in the format: jdbc:hive2://host1:port1,host2:port2,host3:port3/"); + } + return zooKeeperEnsemble; + } + + /** + * Extract the HiveServer2 ZooKeeper namespace to be used. + * + * @param uri + * @return + * @throws SQLException + */ + private String extractZooKeeperNamespace(String uri) throws SQLException { + // Extract the ZooKeeper namespace for HiveServer2 from the uri + String serverZooKeeperNamespace = getParamValueFromURI(uri, HIVE_SERVER2_ZOOKEEPER_NAMESPACE); + if ((serverZooKeeperNamespace == null) || serverZooKeeperNamespace.isEmpty()) { + throw new SQLException("Please specify the HiveServer2 namespace on ZooKeeper that should " + + "be used to read server uris from."); + } + return serverZooKeeperNamespace; + } + + /** + * Given a uri containing a param-value pair, returns the value part for the param. Returns null + * if the param doesn't exist in the uri. + * + * @param uri + * @param paramName + * @return + */ + private String getParamValueFromURI(String uri, String paramName) { + // Params can be of the form: + // param=value; OR + // param=value + String nameValueString[]; + int fromIndex = uri.indexOf(paramName); + // paramName doesn't exist in the uri + if (fromIndex < 0) { + return null; + } + int toIndex = uri.indexOf(";", fromIndex); + if (toIndex < 0) { + // => this is the last param in the uri + nameValueString = uri.substring(fromIndex).split("="); + } else { + nameValueString = uri.substring(fromIndex, uri.indexOf(";", fromIndex)).split("="); + } + return nameValueString[1]; + } + private void openTransport() throws SQLException { // TODO: Refactor transport creation to a factory, it's getting uber messy here transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java new file mode 100644 index 0000000..a6fa97e --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import org.apache.zookeeper.Watcher; + +public class ZooKeeperHiveClientHelper { + + /** + * A no-op watcher class + */ + public static class DummyWatcher implements Watcher { + public void process(org.apache.zookeeper.WatchedEvent event) { + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 46044d0..b724e00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -18,27 +18,48 @@ package org.apache.hadoop.hive.ql.lockmgr.zookeeper; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.lockmgr.*; +import org.apache.hadoop.hive.ql.lockmgr.HiveLock; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockManagerCtx; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; +import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; -import org.apache.hadoop.hive.ql.metadata.*; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.metadata.DummyPartition; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; -import java.io.IOException; -import java.net.InetAddress; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; public class ZooKeeperHiveLockManager implements HiveLockManager { HiveLockManagerCtx ctx; @@ -72,31 +93,6 @@ public ZooKeeperHiveLockManager() { } /** - * @param conf The hive configuration - * Get the quorum server address from the configuration. The format is: - * host1:port, host2:port.. - **/ - @VisibleForTesting - static String getQuorumServers(HiveConf conf) { - String[] hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM).split(","); - String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT); - StringBuilder quorum = new StringBuilder(); - for(int i=0; i= numRetriesForUnLock) { String name = ((ZooKeeperHiveLock)hiveLock).getPath(); - LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts."); + LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts."); throw new LockException(e); } } @@ -493,7 +489,7 @@ static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient, zkpClient.delete(name, -1); } } catch (KeeperException.NoNodeException nne) { - //can happen in retrying deleting the zLock after exceptions like InterruptedException + //can happen in retrying deleting the zLock after exceptions like InterruptedException //or in a race condition where parent has already been deleted by other process when it //is to be deleted. Both cases should not raise error LOG.debug("Node " + zLock.getPath() + " or its parent has already been deleted."); @@ -501,7 +497,7 @@ static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient, //can happen in a race condition where another process adds a zLock under this parent //just before it is about to be deleted. It should not be a problem since this parent //can eventually be deleted by the process which hold its last child zLock - LOG.debug("Node " + name + " to be deleted is not empty."); + LOG.debug("Node " + name + " to be deleted is not empty."); } catch (Exception e) { //exceptions including InterruptException and other KeeperException LOG.error("Failed to release ZooKeeper lock: ", e); @@ -514,9 +510,9 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { ZooKeeper zkpClient = null; try { int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); - String quorumServers = getQuorumServers(conf); - Watcher dummWatcher = new DummyWatcher(); - zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummWatcher); + String quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf); + Watcher dummyWatcher = new ZooKeeperHiveHelper.DummyWatcher(); + zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummyWatcher); String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE); List locks = getLocks(conf, zkpClient, null, parent, false, false); Exception lastExceptionGot = null; @@ -626,7 +622,8 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { if (fetchData) { try { - data = new HiveLockObjectData(new String(zkpClient.getData(curChild, new DummyWatcher(), null))); + data = new HiveLockObjectData(new String(zkpClient.getData(curChild, + new ZooKeeperHiveHelper.DummyWatcher(), null))); data.setClientIp(clientIp); } catch (Exception e) { LOG.error("Error in getting data for " + curChild, e); @@ -786,11 +783,6 @@ private static HiveLockMode getLockMode(HiveConf conf, String path) { return null; } - public static class DummyWatcher implements Watcher { - public void process(org.apache.zookeeper.WatchedEvent event) { - } - } - @Override public void prepareRetry() throws LockException { try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java new file mode 100644 index 0000000..509e858 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.zookeeper.Watcher; + +public class ZooKeeperHiveHelper { + + /** + * @param conf The hive configuration + * Get the ensemble server addresses from the configuration. The format is: + * host1:port, host2:port.. + **/ + public static String getQuorumServers(HiveConf conf) { + String[] hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM).split(","); + String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT); + StringBuilder quorum = new StringBuilder(); + for(int i=0; i(); public OperationManager() { - super("OperationManager"); + super(OperationManager.class.getSimpleName()); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index bc0a02c..4840182 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -152,8 +152,8 @@ private void processGlobalInitFile() { IHiveFileProcessor processor = new GlobalHivercFileProcessor(); try { - if (hiveConf.getVar(ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION) != null) { - String hiverc = hiveConf.getVar(ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION) + if (hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION) != null) { + String hiverc = hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION) + File.separator + SessionManager.HIVERCFILE; if (new File(hiverc).exists()) { LOG.info("Running global init file: " + hiverc); diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index d573592..b477f1f 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -54,7 +54,7 @@ private ThreadPoolExecutor backgroundOperationPool; public SessionManager() { - super("SessionManager"); + super(SessionManager.class.getSimpleName()); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index 37b05fc..4efcf17 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -18,8 +18,6 @@ package org.apache.hive.service.cli.thrift; -import java.net.InetSocketAddress; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; @@ -35,7 +33,7 @@ public class ThriftBinaryCLIService extends ThriftCLIService { public ThriftBinaryCLIService(CLIService cliService) { - super(cliService, "ThriftBinaryCLIService"); + super(cliService, ThriftBinaryCLIService.class.getSimpleName()); } @Override @@ -45,27 +43,6 @@ public void run() { TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory(); TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this); - String portString = System.getenv("HIVE_SERVER2_THRIFT_PORT"); - if (portString != null) { - portNum = Integer.valueOf(portString); - } else { - portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT); - } - - String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); - if (hiveHost == null) { - hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); - } - - if (hiveHost != null && !hiveHost.isEmpty()) { - serverAddress = new InetSocketAddress(hiveHost, portNum); - } else { - serverAddress = new InetSocketAddress(portNum); - } - - minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); - maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); - TServerSocket serverSocket = null; if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum); @@ -88,13 +65,14 @@ public void run() { .maxWorkerThreads(maxWorkerThreads); server = new TThreadPoolServer(sargs); - - LOG.info("ThriftBinaryCLIService listening on " + serverAddress); - server.serve(); - + String msg = "Started " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; + LOG.info(msg); } catch (Throwable t) { - LOG.error("Error: ", t); + LOG.fatal("Error starting HiveServer2: could not start " + + ThriftBinaryCLIService.class.getSimpleName(), t); + System.exit(-1); } } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 027931e..9dcf11c 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -43,6 +43,7 @@ import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.SessionManager; +import org.apache.hive.service.server.HiveServer2; import org.apache.thrift.TException; import org.apache.thrift.server.TServer; @@ -60,6 +61,7 @@ protected int portNum; protected InetSocketAddress serverAddress; + protected String hiveHost; protected TServer server; protected org.eclipse.jetty.server.Server httpServer; @@ -81,6 +83,37 @@ public ThriftCLIService(CLIService cliService, String serviceName) { @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; + minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); + maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); + + String portString; + hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); + if (hiveHost == null) { + hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); + } + // HTTP mode + if (HiveServer2.isHTTPTransportMode(hiveConf)) { + portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT"); + if (portString != null) { + portNum = Integer.valueOf(portString); + } else { + portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT); + } + } + // Binary mode + else { + portString = System.getenv("HIVE_SERVER2_THRIFT_PORT"); + if (portString != null) { + portNum = Integer.valueOf(portString); + } else { + portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT); + } + } + if (hiveHost != null && !hiveHost.isEmpty()) { + serverAddress = new InetSocketAddress(hiveHost, portNum); + } else { + serverAddress = new InetSocketAddress(portNum); + } super.init(hiveConf); } @@ -113,6 +146,14 @@ public synchronized void stop() { super.stop(); } + public int getPortNumber() { + return portNum; + } + + public InetSocketAddress getServerAddress() { + return serverAddress; + } + @Override public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) throws TException { diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index c380b69..407b1ee 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -42,7 +42,7 @@ public class ThriftHttpCLIService extends ThriftCLIService { public ThriftHttpCLIService(CLIService cliService) { - super(cliService, "ThriftHttpCLIService"); + super(cliService, ThriftHttpCLIService.class.getSimpleName()); } @Override @@ -53,20 +53,12 @@ public void run() { // a gateway may cause actual target URL to differ, e.g. http://gateway:port/hive2/servlets/thrifths2/ verifyHttpConfiguration(hiveConf); + + httpServer = new org.eclipse.jetty.server.Server(serverAddress); - String portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT"); - if (portString != null) { - portNum = Integer.valueOf(portString); - } else { - portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT); - } - - minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS); - maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS); - - String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); + String httpPath = getHttpPath(hiveConf.getVar( + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); - httpServer = new org.eclipse.jetty.server.Server(); QueuedThreadPool threadPool = new QueuedThreadPool(); threadPool.setMinThreads(minWorkerThreads); threadPool.setMaxThreads(maxWorkerThreads); @@ -95,14 +87,14 @@ public void run() { sslContextFactory.setKeyStorePassword(keyStorePassword); connector = new SslSelectChannelConnector(sslContextFactory); } - + connector.setPort(portNum); // Linux:yes, Windows:no connector.setReuseAddress(!Shell.WINDOWS); - + int maxIdleTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME); connector.setMaxIdleTime(maxIdleTime); - + httpServer.addConnector(connector); hiveAuthFactory = new HiveAuthFactory(hiveConf); @@ -123,13 +115,15 @@ public void run() { // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc. httpServer.start(); - String msg = "Started ThriftHttpCLIService in " + schemeName + " mode on port " + portNum + - " path=" + httpPath + - " with " + minWorkerThreads + ".." + maxWorkerThreads + " worker threads"; + String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName + + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..." + + maxWorkerThreads + " worker threads"; LOG.info(msg); httpServer.join(); } catch (Throwable t) { - LOG.error("Error: ", t); + LOG.fatal("Error starting HiveServer2: could not start " + + ThriftHttpCLIService.class.getSimpleName(), t); + System.exit(-1); } } diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 0864dfb..469f96c 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -18,6 +18,8 @@ package org.apache.hive.service.server; +import java.nio.charset.Charset; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.LogUtils; @@ -25,12 +27,17 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; /** * HiveServer2. @@ -41,9 +48,11 @@ private CLIService cliService; private ThriftCLIService thriftCLIService; + private String znodePath; + private ZooKeeper zooKeeperClient; public HiveServer2() { - super("HiveServer2"); + super(HiveServer2.class.getSimpleName()); HiveConf.setLoadHiveServer2Config(true); } @@ -52,22 +61,67 @@ public HiveServer2() { public synchronized void init(HiveConf hiveConf) { cliService = new CLIService(); addService(cliService); - - String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE"); - if(transportMode == null) { - transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE); - } - if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) { + if (isHTTPTransportMode(hiveConf)) { thriftCLIService = new ThriftHttpCLIService(cliService); } else { thriftCLIService = new ThriftBinaryCLIService(cliService); } - addService(thriftCLIService); super.init(hiveConf); } + public static boolean isHTTPTransportMode(HiveConf hiveConf) { + String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE"); + if (transportMode == null) { + transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE); + } + if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) { + return true; + } + return false; + } + + /** + * @param hiveConf + * @throws Exception + */ + private void addServerInstanceForDiscovery(HiveConf hiveConf) throws Exception { + int zooKeeperSessionTimeout = hiveConf + .getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); + String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); + String instanceURI = getServerInstanceURI(hiveConf); + byte[] znodeDataUTF8 = getServerInstanceURI(hiveConf).getBytes(Charset.forName("UTF-8")); + zooKeeperClient = new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout, + new ZooKeeperHiveHelper.DummyWatcher()); + + // Create the parent znode; ignore if the parent already exists + try { + zooKeeperClient.create("/" + rootNamespace, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + LOG.info("Created the root name space:" + rootNamespace + " on ZooKeeper for HiveServer2"); + } catch (KeeperException e) { + if (e.code() != KeeperException.Code.NODEEXISTS) { + LOG.warn("Unexpected ZK exception when creating parent node /" + rootNamespace, e); + } + } + // Create a znode under the rootNamespace parent for this instance of the server + try { + znodePath = zooKeeperClient.create("/" + rootNamespace + "/server", znodeDataUTF8, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI); + } catch (KeeperException e) { + LOG.error("Unable to create a znode for this server instance", e); + throw new Exception(e); + } + } + + private String getServerInstanceURI(HiveConf hiveConf) { + return thriftCLIService.getServerAddress().getHostName() + ":" + + thriftCLIService.getPortNumber(); + } + @Override public synchronized void start() { super.start(); @@ -99,6 +153,11 @@ private static void startHiveServer2() throws Throwable { server = new HiveServer2(); server.init(hiveConf); server.start(); + // If we're supporting dynamic service discovery, we'll add the service uri for this + // HiveServer2 instance to Zookeeper as a znode. + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + server.addServerInstanceForDiscovery(hiveConf); + } if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance(); sessionPool.setupPool(hiveConf); diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java index 66fc1fc..5b1cbc0 100644 --- a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java +++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java @@ -27,7 +27,11 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hive.service.cli.*; +import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.ICLIService; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.RowSet; +import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient; import org.junit.After; @@ -83,7 +87,7 @@ public void setUp() throws Exception { // set up service and client HiveConf hiveConf = new HiveConf(); - hiveConf.setVar(HiveConf.ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION, + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION, initFile.getParentFile().getAbsolutePath()); service = new FakeEmbeddedThriftBinaryCLIService(hiveConf); service.init(new HiveConf());