diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 5d2e6b0..1fb39b2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1243,10 +1243,15 @@
"This param is to control whether or not only do lock on queries\n" +
"that need to execute at least one mapred job."),
+ // Zookeeper related configs
HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", "",
- "The list of ZooKeeper servers to talk to. This is only needed for read/write locks."),
+ "List of ZooKeeper servers to talk to. This is needed for: " +
+ "1. Read/write locks, " +
+ "2. When HiveServer2 supports service discovery via Zookeeper."),
HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", "2181",
- "The port of ZooKeeper servers to talk to. This is only needed for read/write locks."),
+ "The port of ZooKeeper servers to talk to. " +
+ "If the list of Zookeeper servers specified in ${hive.zookeeper.quorum}," +
+ "does not contain port numbers, this value is used."),
HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000,
"ZooKeeper client's session timeout. The client is disconnected, and as a result, all locks released, \n" +
"if a heartbeat is not sent in the timeout."),
@@ -1446,11 +1451,6 @@
"If the property is set, the value must be a valid URI (java.net.URI, e.g. \"file:///tmp/my-logging.properties\"), \n" +
"which you can then extract a URL from and pass to PropertyConfigurator.configure(URL)."),
- // Hive global init file location
- HIVE_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}",
- "The location of HS2 global init file (.hiverc).\n" +
- "If the property is reset, the value must be a valid path where the init file is located."),
-
// prefix used to auto generated column aliases (this should be started with '_')
HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL("hive.autogen.columnalias.prefix.label", "_c",
"String used as a prefix when auto generating column alias.\n" +
@@ -1489,12 +1489,25 @@
"table. From 0.12 onwards, they are displayed separately. This flag will let you\n" +
"get old behavior, if desired. See, test-case in patch for HIVE-6689."),
+ // HiveServer2 specific configs
HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, new RangeValidator(0L, null),
- "This number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds between retries. \n" +
- "The default of 30 will keep trying for 30 minutes."),
-
+ "Number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds " +
+ "between retries. \n The default of 30 will keep trying for 30 minutes."),
+ HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY("hive.server2.support.dynamic.service.discovery", false,
+ "Whether HiveServer2 supports dynamic service discovery for its clients. " +
+ "To support this, each instance of HiveServer2 currently uses ZooKeeper to register itself, " +
+ "when it is brought up. JDBC/ODBC clients should use the ZooKeeper ensemble: " +
+ "${hive.zookeeper.quorum} in their connection string."),
+ HIVE_SERVER2_ZOOKEEPER_NAMESPACE("hive.server2.zookeeper.namespace", "hiveserver2",
+ "The parent node in ZooKeeper used by HiveServer2 when supporting dynamic service discovery."),
+ // HiveServer2 global init file location
+ HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}",
+ "The location of HS2 global init file (.hiverc).\n" +
+ "If the property is reset, the value must be a valid path where the init file is located."),
HIVE_SERVER2_TRANSPORT_MODE("hive.server2.transport.mode", "binary", new StringSet("binary", "http"),
"Transport mode of HiveServer2."),
+ HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "",
+ "Bind host on which to run the HiveServer2 Thrift service."),
// http (over thrift) transport settings
HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001,
@@ -1517,9 +1530,6 @@
HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000,
"Port number of HiveServer2 Thrift interface.\n" +
"Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT"),
- HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "",
- "Bind host on which to run the HiveServer2 Thrift interface.\n" +
- "Can be overridden by setting $HIVE_SERVER2_THRIFT_BIND_HOST"),
// hadoop.rpc.protection being set to a higher level than HiveServer2
// does not make sense in most situations.
// HiveServer2 ignores hadoop.rpc.protection in favor of hive.server2.thrift.sasl.qop.
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index ae128a9..6fd4826 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -1654,7 +1654,7 @@ public void testDriverProperties() throws SQLException {
};
@Test
- public void testParseUrlHttpMode() throws SQLException {
+ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException, ZooKeeperHiveClientException {
new HiveDriver();
for (String[] testValues : HTTP_URL_PROPERTIES) {
JdbcConnectionParams params = Utils.parseURL(testValues[0]);
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 1ad13a7..0132f12 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -80,6 +80,17 @@
libthrift
${libthrift.version}
+
+ org.apache.zookeeper
+ zookeeper
+ ${zookeeper.version}
+
+
+ org.jboss.netty
+ netty
+
+
+
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index cbcfec7..874b1a4 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -53,6 +53,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.KerberosSaslHelper;
import org.apache.hive.service.auth.PlainSaslHelper;
@@ -86,37 +87,20 @@
*/
public class HiveConnection implements java.sql.Connection {
public static final Log LOG = LogFactory.getLog(HiveConnection.class.getName());
- private static final String HIVE_AUTH_TYPE= "auth";
- private static final String HIVE_AUTH_QOP = "sasl.qop";
- private static final String HIVE_AUTH_SIMPLE = "noSasl";
- private static final String HIVE_AUTH_TOKEN = "delegationToken";
- private static final String HIVE_AUTH_USER = "user";
- private static final String HIVE_AUTH_PRINCIPAL = "principal";
- private static final String HIVE_AUTH_PASSWD = "password";
- private static final String HIVE_AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
- private static final String HIVE_AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
- private static final String HIVE_ANONYMOUS_USER = "anonymous";
- private static final String HIVE_ANONYMOUS_PASSWD = "anonymous";
- private static final String HIVE_USE_SSL = "ssl";
- private static final String HIVE_SSL_TRUST_STORE = "sslTrustStore";
- private static final String HIVE_SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
- private static final String HIVE_SERVER2_TRANSPORT_MODE = "hive.server2.transport.mode";
- private static final String HIVE_SERVER2_THRIFT_HTTP_PATH = "hive.server2.thrift.http.path";
private static final String HIVE_VAR_PREFIX = "hivevar:";
private static final String HIVE_CONF_PREFIX = "hiveconf:";
- // Currently supports JKS keystore format
- // See HIVE-6286 (Add support for PKCS12 keystore format)
- private static final String HIVE_SSL_TRUST_STORE_TYPE = "JKS";
- private final String jdbcURI;
- private final String host;
- private final int port;
+ private String jdbcUriString;
+ private String host;
+ private int port;
private final Map sessConfMap;
private final Map hiveConfMap;
private final Map hiveVarMap;
+ private JdbcConnectionParams connParams;
private final boolean isEmbeddedMode;
private TTransport transport;
- private TCLIService.Iface client; // todo should be replaced by CliServiceClient
+ // TODO should be replaced by CliServiceClient
+ private TCLIService.Iface client;
private boolean isClosed = true;
private SQLWarning warningChain = null;
private TSessionHandle sessHandle = null;
@@ -126,14 +110,12 @@
public HiveConnection(String uri, Properties info) throws SQLException {
setupLoginTimeout();
- jdbcURI = uri;
- // parse the connection uri
- Utils.JdbcConnectionParams connParams;
try {
connParams = Utils.parseURL(uri);
- } catch (IllegalArgumentException e) {
+ } catch (ZooKeeperHiveClientException e) {
throw new SQLException(e);
}
+ jdbcUriString = connParams.getJdbcUriString();
// extract parsed connection parameters:
// JDBC URL: jdbc:hive2://:/dbName;sess_var_list?hive_conf_list#hive_var_list
// each list: =;= and so on
@@ -164,14 +146,14 @@ public HiveConnection(String uri, Properties info) throws SQLException {
} else {
// extract user/password from JDBC connection properties if its not supplied in the
// connection URL
- if (info.containsKey(HIVE_AUTH_USER)) {
- sessConfMap.put(HIVE_AUTH_USER, info.getProperty(HIVE_AUTH_USER));
- if (info.containsKey(HIVE_AUTH_PASSWD)) {
- sessConfMap.put(HIVE_AUTH_PASSWD, info.getProperty(HIVE_AUTH_PASSWD));
+ if (info.containsKey(JdbcConnectionParams.AUTH_USER)) {
+ sessConfMap.put(JdbcConnectionParams.AUTH_USER, info.getProperty(JdbcConnectionParams.AUTH_USER));
+ if (info.containsKey(JdbcConnectionParams.AUTH_PASSWD)) {
+ sessConfMap.put(JdbcConnectionParams.AUTH_PASSWD, info.getProperty(JdbcConnectionParams.AUTH_PASSWD));
}
}
- if (info.containsKey(HIVE_AUTH_TYPE)) {
- sessConfMap.put(HIVE_AUTH_TYPE, info.getProperty(HIVE_AUTH_TYPE));
+ if (info.containsKey(JdbcConnectionParams.AUTH_TYPE)) {
+ sessConfMap.put(JdbcConnectionParams.AUTH_TYPE, info.getProperty(JdbcConnectionParams.AUTH_TYPE));
}
// open the client transport
openTransport();
@@ -189,19 +171,44 @@ public HiveConnection(String uri, Properties info) throws SQLException {
supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7);
// open client session
- openSession(connParams);
+ openSession();
}
private void openTransport() throws SQLException {
- // TODO: Refactor transport creation to a factory, it's getting uber messy here
- transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
- try {
- if (!transport.isOpen()) {
- transport.open();
+ while (true) {
+ transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
+ try {
+ if (!transport.isOpen()) {
+ LOG.info("Will try to open client transport with JDBC Uri: " + jdbcUriString);
+ transport.open();
+ }
+ break;
+ } catch (TTransportException e) {
+ LOG.info("Could not open client transport with JDBC Uri: " + jdbcUriString);
+ // We'll retry till we exhaust all HiveServer2 uris from ZooKeeper
+ if ((sessConfMap.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE) != null)
+ && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER.equalsIgnoreCase(sessConfMap
+ .get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE)))) {
+ try {
+ // Update jdbcUriString, host & port variables in connParams
+ // Throw an exception if all HiveServer2 uris have been exhausted,
+ // or if we're unable to connect to ZooKeeper.
+ Utils.updateConnParamsFromZooKeeper(connParams);
+ } catch (ZooKeeperHiveClientException ze) {
+ throw new SQLException(
+ "Could not open client transport for any of the Server URI's in ZooKeeper: "
+ + ze.getMessage(), " 08S01", ze);
+ }
+ // Update with new values
+ jdbcUriString = connParams.getJdbcUriString();
+ host = connParams.getHost();
+ port = connParams.getPort();
+ LOG.info("Will retry opening client transport");
+ } else {
+ throw new SQLException("Could not open client transport with JDBC Uri: " + jdbcUriString
+ + ": " + e.getMessage(), " 08S01", e);
+ }
}
- } catch (TTransportException e) {
- throw new SQLException("Could not open connection to "
- + jdbcURI + ": " + e.getMessage(), " 08S01", e);
}
}
@@ -211,14 +218,13 @@ private String getServerHttpUrl(boolean useSsl) {
String schemeName = useSsl ? "https" : "http";
// http path should begin with "/"
String httpPath;
- httpPath = hiveConfMap.get(HIVE_SERVER2_THRIFT_HTTP_PATH);
- if(httpPath == null) {
+ httpPath = hiveConfMap.get(JdbcConnectionParams.HTTP_PATH);
+ if (httpPath == null) {
httpPath = "/";
- }
- else if(!httpPath.startsWith("/")) {
+ } else if (!httpPath.startsWith("/")) {
httpPath = "/" + httpPath;
}
- return schemeName + "://" + host + ":" + port + httpPath;
+ return schemeName + "://" + host + ":" + port + httpPath;
}
private TTransport createHttpTransport() throws SQLException {
@@ -231,7 +237,7 @@ private TTransport createHttpTransport() throws SQLException {
httpClient = getHttpClient(useSsl);
} catch (Exception e) {
String msg = "Could not create http connection to " +
- jdbcURI + ". " + e.getMessage();
+ jdbcUriString + ". " + e.getMessage();
throw new SQLException(msg, " 08S01", e);
}
@@ -240,7 +246,7 @@ private TTransport createHttpTransport() throws SQLException {
}
catch (TTransportException e) {
String msg = "Could not create http connection to " +
- jdbcURI + ". " + e.getMessage();
+ jdbcUriString + ". " + e.getMessage();
throw new SQLException(msg, " 08S01", e);
}
return transport;
@@ -263,7 +269,7 @@ private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException {
* for sending to the server before every request.
*/
requestInterceptor = new HttpKerberosRequestInterceptor(
- sessConfMap.get(HIVE_AUTH_PRINCIPAL), host, getServerHttpUrl(false));
+ sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host, getServerHttpUrl(false));
}
else {
/**
@@ -273,9 +279,9 @@ private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException {
requestInterceptor = new HttpBasicAuthInterceptor(getUserName(), getPassword());
// Configure httpClient for SSL
if (useSsl) {
- String sslTrustStorePath = sessConfMap.get(HIVE_SSL_TRUST_STORE);
+ String sslTrustStorePath = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE);
String sslTrustStorePassword = sessConfMap.get(
- HIVE_SSL_TRUST_STORE_PASSWORD);
+ JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
KeyStore sslTrustStore;
SSLSocketFactory socketFactory;
try {
@@ -285,7 +291,7 @@ private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException {
}
else {
// Pick trust store config from the given path
- sslTrustStore = KeyStore.getInstance(HIVE_SSL_TRUST_STORE_TYPE);
+ sslTrustStore = KeyStore.getInstance(JdbcConnectionParams.SSL_TRUST_STORE_TYPE);
sslTrustStore.load(new FileInputStream(sslTrustStorePath),
sslTrustStorePassword.toCharArray());
socketFactory = new SSLSocketFactory(sslTrustStore);
@@ -296,7 +302,7 @@ private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException {
}
catch (Exception e) {
String msg = "Could not create an https connection to " +
- jdbcURI + ". " + e.getMessage();
+ jdbcUriString + ". " + e.getMessage();
throw new SQLException(msg, " 08S01", e);
}
}
@@ -320,25 +326,27 @@ private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException {
private TTransport createBinaryTransport() throws SQLException {
try {
// handle secure connection if specified
- if (!HIVE_AUTH_SIMPLE.equals(sessConfMap.get(HIVE_AUTH_TYPE))) {
+ if (!JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))) {
// If Kerberos
Map saslProps = new HashMap();
SaslQOP saslQOP = SaslQOP.AUTH;
- if (sessConfMap.containsKey(HIVE_AUTH_PRINCIPAL)) {
- if (sessConfMap.containsKey(HIVE_AUTH_QOP)) {
+ if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL)) {
+ if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_QOP)) {
try {
- saslQOP = SaslQOP.fromString(sessConfMap.get(HIVE_AUTH_QOP));
+ saslQOP = SaslQOP.fromString(sessConfMap.get(JdbcConnectionParams.AUTH_QOP));
} catch (IllegalArgumentException e) {
- throw new SQLException("Invalid " + HIVE_AUTH_QOP +
+ throw new SQLException("Invalid " + JdbcConnectionParams.AUTH_QOP +
" parameter. " + e.getMessage(), "42000", e);
}
}
saslProps.put(Sasl.QOP, saslQOP.toString());
saslProps.put(Sasl.SERVER_AUTH, "true");
- boolean assumeSubject = HIVE_AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap.get(HIVE_AUTH_KERBEROS_AUTH_TYPE));
+ boolean assumeSubject = JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap
+ .get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE));
transport = KerberosSaslHelper.getKerberosTransport(
- sessConfMap.get(HIVE_AUTH_PRINCIPAL), host,
- HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps, assumeSubject);
+ sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host,
+ HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps,
+ assumeSubject);
} else {
// If there's a delegation token available then use token based connection
String tokenStr = getClientDelegationToken(sessConfMap);
@@ -351,8 +359,8 @@ private TTransport createBinaryTransport() throws SQLException {
String passwd = getPassword();
if (isSslConnection()) {
// get SSL socket
- String sslTrustStore = sessConfMap.get(HIVE_SSL_TRUST_STORE);
- String sslTrustStorePassword = sessConfMap.get(HIVE_SSL_TRUST_STORE_PASSWORD);
+ String sslTrustStore = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE);
+ String sslTrustStorePassword = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
if (sslTrustStore == null || sslTrustStore.isEmpty()) {
transport = HiveAuthFactory.getSSLSocket(host, port, loginTimeout);
} else {
@@ -373,10 +381,10 @@ private TTransport createBinaryTransport() throws SQLException {
}
} catch (SaslException e) {
throw new SQLException("Could not create secure connection to "
- + jdbcURI + ": " + e.getMessage(), " 08S01", e);
+ + jdbcUriString + ": " + e.getMessage(), " 08S01", e);
} catch (TTransportException e) {
throw new SQLException("Could not create connection to "
- + jdbcURI + ": " + e.getMessage(), " 08S01", e);
+ + jdbcUriString + ": " + e.getMessage(), " 08S01", e);
}
return transport;
}
@@ -385,7 +393,7 @@ private TTransport createBinaryTransport() throws SQLException {
private String getClientDelegationToken(Map jdbcConnConf)
throws SQLException {
String tokenStr = null;
- if (HIVE_AUTH_TOKEN.equalsIgnoreCase(jdbcConnConf.get(HIVE_AUTH_TYPE))) {
+ if (JdbcConnectionParams.AUTH_TOKEN.equalsIgnoreCase(jdbcConnConf.get(JdbcConnectionParams.AUTH_TYPE))) {
// check delegation token in job conf if any
try {
tokenStr = ShimLoader.getHadoopShims().
@@ -397,7 +405,7 @@ private String getClientDelegationToken(Map jdbcConnConf)
return tokenStr;
}
- private void openSession(Utils.JdbcConnectionParams connParams) throws SQLException {
+ private void openSession() throws SQLException {
TOpenSessionReq openReq = new TOpenSessionReq();
Map openConf = new HashMap();
@@ -433,7 +441,7 @@ private void openSession(Utils.JdbcConnectionParams connParams) throws SQLExcept
} catch (TException e) {
LOG.error("Error opening session", e);
throw new SQLException("Could not establish connection to "
- + jdbcURI + ": " + e.getMessage(), " 08S01", e);
+ + jdbcUriString + ": " + e.getMessage(), " 08S01", e);
}
isClosed = false;
}
@@ -442,27 +450,27 @@ private void openSession(Utils.JdbcConnectionParams connParams) throws SQLExcept
* @return username from sessConfMap
*/
private String getUserName() {
- return getSessionValue(HIVE_AUTH_USER, HIVE_ANONYMOUS_USER);
+ return getSessionValue(JdbcConnectionParams.AUTH_USER, JdbcConnectionParams.ANONYMOUS_USER);
}
/**
* @return password from sessConfMap
*/
private String getPassword() {
- return getSessionValue(HIVE_AUTH_PASSWD, HIVE_ANONYMOUS_PASSWD);
+ return getSessionValue(JdbcConnectionParams.AUTH_PASSWD, JdbcConnectionParams.ANONYMOUS_PASSWD);
}
private boolean isSslConnection() {
- return "true".equalsIgnoreCase(sessConfMap.get(HIVE_USE_SSL));
+ return "true".equalsIgnoreCase(sessConfMap.get(JdbcConnectionParams.USE_SSL));
}
private boolean isKerberosAuthMode() {
- return !HIVE_AUTH_SIMPLE.equals(sessConfMap.get(HIVE_AUTH_TYPE))
- && sessConfMap.containsKey(HIVE_AUTH_PRINCIPAL);
+ return !JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))
+ && sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL);
}
private boolean isHttpTransportMode() {
- String transportMode = hiveConfMap.get(HIVE_SERVER2_TRANSPORT_MODE);
+ String transportMode = hiveConfMap.get(JdbcConnectionParams.TRANSPORT_MODE);
if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
return true;
}
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java b/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java
index 6e248d6..1134994 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java
@@ -230,7 +230,12 @@ private Properties parseURLforPropertyInfo(String url, Properties defaults) thro
throw new SQLException("Invalid connection url: " + url);
}
- JdbcConnectionParams params = Utils.parseURL(url);
+ JdbcConnectionParams params = null;
+ try {
+ params = Utils.parseURL(url);
+ } catch (ZooKeeperHiveClientException e) {
+ throw new SQLException(e);
+ }
String host = params.getHost();
if (host == null){
host = "";
diff --git a/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java b/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java
new file mode 100644
index 0000000..6bb2e20
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.sql.SQLException;
+
+public class JdbcUriParseException extends SQLException {
+
+ private static final long serialVersionUID = 0;
+
+ /**
+ * @param cause (original exception)
+ */
+ public JdbcUriParseException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * @param msg (exception message)
+ */
+ public JdbcUriParseException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * @param msg (exception message)
+ * @param cause (original exception)
+ */
+ public JdbcUriParseException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+}
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 58339bf..216975a 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -19,17 +19,23 @@
package org.apache.hive.jdbc;
import java.net.URI;
+import java.net.URISyntaxException;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.thrift.TStatus;
import org.apache.hive.service.cli.thrift.TStatusCode;
public class Utils {
+ public static final Log LOG = LogFactory.getLog(Utils.class.getName());
/**
* The required prefix for the connection URL.
*/
@@ -48,13 +54,55 @@
private static final String URI_JDBC_PREFIX = "jdbc:";
public static class JdbcConnectionParams {
+ // Note on client side parameter naming convention:
+ // Prefer using a shorter camelCase param name instead of using the same name as the
+ // corresponding
+ // HiveServer2 config.
+ // For a jdbc url: jdbc:hive2://:/dbName;sess_var_list?hive_conf_list#hive_var_list,
+ // client side params are specified in sess_var_list
+
+ // Client param names:
+ static final String AUTH_TYPE = "auth";
+ static final String AUTH_QOP = "sasl.qop";
+ static final String AUTH_SIMPLE = "noSasl";
+ static final String AUTH_TOKEN = "delegationToken";
+ static final String AUTH_USER = "user";
+ static final String AUTH_PRINCIPAL = "principal";
+ static final String AUTH_PASSWD = "password";
+ static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType";
+ static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject";
+ static final String ANONYMOUS_USER = "anonymous";
+ static final String ANONYMOUS_PASSWD = "anonymous";
+ static final String USE_SSL = "ssl";
+ static final String SSL_TRUST_STORE = "sslTrustStore";
+ static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword";
+ static final String TRANSPORT_MODE = "hive.server2.transport.mode";
+ static final String HTTP_PATH = "hive.server2.thrift.http.path";
+ static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode";
+ // Don't use dynamic serice discovery
+ static final String SERVICE_DISCOVERY_MODE_NONE = "none";
+ // Use ZooKeeper for indirection while using dynamic service discovery
+ static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
+ static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace";
+
+ // Non-configurable params:
+ // ZOOKEEPER_SESSION_TIMEOUT is not exposed as client configurable
+ static final int ZOOKEEPER_SESSION_TIMEOUT = 600 * 1000;
+ // Currently supports JKS keystore format
+ static final String SSL_TRUST_STORE_TYPE = "JKS";
+
private String host = null;
private int port;
+ private String jdbcUriString;
private String dbName = DEFAULT_DATABASE;
private Map hiveConfs = new LinkedHashMap();
private Map hiveVars = new LinkedHashMap();
private Map sessionVars = new LinkedHashMap();
private boolean isEmbeddedMode = false;
+ private String[] authorityList;
+ private String zooKeeperEnsemble = null;
+ private String currentHostZnodePath;
+ private List rejectedHostZnodePaths = new ArrayList();
public JdbcConnectionParams() {
}
@@ -62,46 +110,94 @@ public JdbcConnectionParams() {
public String getHost() {
return host;
}
+
public int getPort() {
return port;
}
+
+ public String getJdbcUriString() {
+ return jdbcUriString;
+ }
+
public String getDbName() {
return dbName;
}
+
public Map getHiveConfs() {
return hiveConfs;
}
- public Map getHiveVars() {
+
+ public Map getHiveVars() {
return hiveVars;
}
+
public boolean isEmbeddedMode() {
return isEmbeddedMode;
}
+
public Map getSessionVars() {
return sessionVars;
}
+ public String[] getAuthorityList() {
+ return authorityList;
+ }
+
+ public String getZooKeeperEnsemble() {
+ return zooKeeperEnsemble;
+ }
+
+ public List getRejectedHostZnodePaths() {
+ return rejectedHostZnodePaths;
+ }
+
+ public String getCurrentHostZnodePath() {
+ return currentHostZnodePath;
+ }
+
public void setHost(String host) {
this.host = host;
}
+
public void setPort(int port) {
this.port = port;
}
+
+ public void setJdbcUriString(String jdbcUriString) {
+ this.jdbcUriString = jdbcUriString;
+ }
+
public void setDbName(String dbName) {
this.dbName = dbName;
}
+
public void setHiveConfs(Map hiveConfs) {
this.hiveConfs = hiveConfs;
}
- public void setHiveVars(Map hiveVars) {
+
+ public void setHiveVars(Map hiveVars) {
this.hiveVars = hiveVars;
}
+
public void setEmbeddedMode(boolean embeddedMode) {
this.isEmbeddedMode = embeddedMode;
}
+
public void setSessionVars(Map sessionVars) {
this.sessionVars = sessionVars;
}
+
+ public void setSuppliedAuthorityList(String[] authorityList) {
+ this.authorityList = authorityList;
+ }
+
+ public void setZooKeeperEnsemble(String zooKeeperEnsemble) {
+ this.zooKeeperEnsemble = zooKeeperEnsemble;
+ }
+
+ public void setCurrentHostZnodePath(String currentHostZnodePath) {
+ this.currentHostZnodePath = currentHostZnodePath;
+ }
}
// Verify success or success_with_info status, else throw SQLException
@@ -124,27 +220,33 @@ public static void verifySuccess(TStatus status, boolean withInfo) throws SQLExc
/**
* Parse JDBC connection URL
- * The new format of the URL is jdbc:hive2://:/dbName;sess_var_list?hive_conf_list#hive_var_list
- * where the optional sess, conf and var lists are semicolon separated = pairs. As before, if the
- * host/port is not specified, it the driver runs an embedded hive.
+ * The new format of the URL is:
+ * jdbc:hive2://:,:/dbName;sess_var_list?hive_conf_list#hive_var_list
+ * where the optional sess, conf and var lists are semicolon separated = pairs.
+ * For utilizing dynamic service discovery with HiveServer2 multiple comma separated host:port pairs can
+ * be specified as shown above.
+ * The JDBC driver resolves the list of uris and picks a specific server instance to connect to.
+ * Currently, dynamic service discovery using ZooKeeper is supported, in which case the host:port pairs represent a ZooKeeper ensemble.
+ *
+ * As before, if the host/port is not specified, it the driver runs an embedded hive.
* examples -
* jdbc:hive2://ubuntu:11000/db2?hive.cli.conf.printheader=true;hive.exec.mode.local.auto.inputbytes.max=9999#stab=salesTable;icol=customerID
* jdbc:hive2://?hive.cli.conf.printheader=true;hive.exec.mode.local.auto.inputbytes.max=9999#stab=salesTable;icol=customerID
* jdbc:hive2://ubuntu:11000/db2;user=foo;password=bar
*
* Connect to http://server:10001/hs2, with specified basicAuth credentials and initial database:
- * jdbc:hive2://server:10001/db;user=foo;password=bar?hive.server2.transport.mode=http;hive.server2.thrift.http.path=hs2
- *
- * Note that currently the session properties are not used.
+ * jdbc:hive2://server:10001/db;user=foo;password=bar?hive.server2.transport.mode=http;hive.server2.thrift.http.path=hs2
*
* @param uri
* @return
+ * @throws SQLException
*/
- public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentException {
+ public static JdbcConnectionParams parseURL(String uri) throws JdbcUriParseException,
+ SQLException, ZooKeeperHiveClientException {
JdbcConnectionParams connParams = new JdbcConnectionParams();
if (!uri.startsWith(URL_PREFIX)) {
- throw new IllegalArgumentException("Bad URL format: Missing prefix " + URL_PREFIX);
+ throw new JdbcUriParseException("Bad URL format: Missing prefix " + URL_PREFIX);
}
// For URLs with no other configuration
@@ -154,28 +256,20 @@ public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentEx
return connParams;
}
- URI jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length()));
-
- // Check to prevent unintentional use of embedded mode. A missing "/"
- // to separate the 'path' portion of URI can result in this.
- // The missing "/" common typo while using secure mode, eg of such url -
- // jdbc:hive2://localhost:10000;principal=hive/HiveServer2Host@YOUR-REALM.COM
- if((jdbcURI.getAuthority() != null) && (jdbcURI.getHost()==null)) {
- throw new IllegalArgumentException("Bad URL format. Hostname not found "
- + " in authority part of the url: " + jdbcURI.getAuthority()
- + ". Are you missing a '/' after the hostname ?");
- }
+ // The JDBC URI now supports specifying multiple host:port if dynamic service discovery is
+ // configured on HiveServer2 (like: host1:port1,host2:port2,host3:port3)
+ // We'll extract the authorities (host:port combo) from the URI, extract session vars, hive
+ // confs & hive vars by parsing it as a Java URI.
+ // To parse the intermediate URI as a Java URI, we'll give a dummy authority(dummy:00000).
+ // Later, we'll substitute the dummy authority for a resolved authority.
+ String suppliedAuthorities = getAuthorities(uri, connParams);
+ String [] authorityList = suppliedAuthorities.split(",");
+ connParams.setSuppliedAuthorityList(authorityList);
+ String dummyAuthorityString = "dummyhost:00000";
+ uri = uri.replace(suppliedAuthorities, dummyAuthorityString);
- connParams.setHost(jdbcURI.getHost());
- if (connParams.getHost() == null) {
- connParams.setEmbeddedMode(true);
- } else {
- int port = jdbcURI.getPort();
- if (port == -1) {
- port = Integer.valueOf(DEFAULT_PORT);
- }
- connParams.setPort(port);
- }
+ // Now parse the connection uri with dummy authority
+ URI jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length()));
// key=value pattern
Pattern pattern = Pattern.compile("([^;]*)=([^;]*)[;]?");
@@ -192,12 +286,13 @@ public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentEx
} else {
// we have dbname followed by session parameters
dbName = sessVars.substring(0, sessVars.indexOf(';'));
- sessVars = sessVars.substring(sessVars.indexOf(';')+1);
+ sessVars = sessVars.substring(sessVars.indexOf(';') + 1);
if (sessVars != null) {
Matcher sessMatcher = pattern.matcher(sessVars);
while (sessMatcher.find()) {
if (connParams.getSessionVars().put(sessMatcher.group(1), sessMatcher.group(2)) != null) {
- throw new IllegalArgumentException("Bad URL format: Multiple values for property " + sessMatcher.group(1));
+ throw new JdbcUriParseException("Bad URL format: Multiple values for property "
+ + sessMatcher.group(1));
}
}
}
@@ -225,10 +320,132 @@ public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentEx
}
}
+ // If we're supporting dynamic service discovery, we'll need to substitute the dummy authority
+ // with a resolved one
+ String resolvedAuthorityString = resolveAuthority(connParams);
+ uri = uri.replace(dummyAuthorityString, resolvedAuthorityString);
+ connParams.setJdbcUriString(uri);
+
+ // Create a Java URI from the resolved URI for extracting the host/port
+ URI resolvedAuthorityURI = null;
+ try {
+ resolvedAuthorityURI = new URI(null, resolvedAuthorityString, null, null, null);
+ } catch (URISyntaxException e) {
+ throw new JdbcUriParseException(e);
+ }
+ connParams.setHost(resolvedAuthorityURI.getHost());
+ connParams.setPort(resolvedAuthorityURI.getPort());
+
return connParams;
}
/**
+ * Get the authority string from the supplied uri, which could potentially contain multiple
+ * host:port pairs.
+ *
+ * @param uri
+ * @param connParams
+ * @return
+ * @throws JdbcUriParseException
+ */
+ private static String getAuthorities(String uri, JdbcConnectionParams connParams)
+ throws JdbcUriParseException {
+ String authorities;
+ // For a jdbc uri like: jdbc:hive2://host1:port1,host2:port2,host3:port3/
+ // Extract the uri host:port list starting after "jdbc:hive2://", till the 1st "/"
+ int fromIndex = Utils.URL_PREFIX.length();
+ int toIndex = uri.indexOf("/", fromIndex);
+ if (toIndex < 0) {
+ throw new JdbcUriParseException("Bad URL format: Missing a '/' after the hostname.");
+ }
+ authorities = uri.substring(fromIndex, uri.indexOf("/", fromIndex));
+ if ((authorities == null) || authorities.isEmpty()) {
+ throw new JdbcUriParseException("Please specify the list of host:port to connect to, "
+ + "in the format: jdbc:hive2://host1:port1,host2:port2,host3:port3/");
+ }
+ LOG.info("Supplied authorties: " + authorities);
+ return authorities;
+ }
+
+ /**
+ * Get a string representing a specific host:port
+ * @param connParams
+ * @return
+ * @throws JdbcUriParseException
+ * @throws ZooKeeperHiveClientException
+ */
+ private static String resolveAuthority(JdbcConnectionParams connParams)
+ throws JdbcUriParseException, ZooKeeperHiveClientException {
+ String serviceDiscoveryMode =
+ connParams.getSessionVars().get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE);
+ if ((serviceDiscoveryMode != null)
+ && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER
+ .equalsIgnoreCase(serviceDiscoveryMode))) {
+ // Resolve using ZooKeeper
+ return resolveAuthorityUsingZooKeeper(connParams);
+ } else {
+ // Return the 1st element of the array
+ return connParams.getAuthorityList()[0];
+ }
+ }
+
+ /**
+ * Read a specific host:port from ZooKeeper
+ * @param connParams
+ * @return
+ * @throws ZooKeeperHiveClientException
+ */
+ private static String resolveAuthorityUsingZooKeeper(JdbcConnectionParams connParams)
+ throws ZooKeeperHiveClientException {
+ // Set ZooKeeper ensemble in connParams for later use
+ connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ","));
+ return ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams);
+ }
+
+ /**
+ * Read the next server coordinates (host:port combo) from ZooKeeper. Ignore the znodes already
+ * explored. Also update the host, port, jdbcUriString fields of connParams.
+ *
+ * @param connParams
+ * @throws ZooKeeperHiveClientException
+ */
+ static void updateConnParamsFromZooKeeper(JdbcConnectionParams connParams)
+ throws ZooKeeperHiveClientException {
+ // Add current host to the rejected list
+ connParams.getRejectedHostZnodePaths().add(connParams.getCurrentHostZnodePath());
+ // Get another HiveServer2 uri from ZooKeeper
+ String serverUriString = ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams);
+ // Parse serverUri to a java URI and extract host, port
+ URI serverUri = null;
+ try {
+ // Note URL_PREFIX is not a valid scheme format, therefore leaving it null in the constructor
+ // to construct a valid URI
+ serverUri = new URI(null, serverUriString, null, null, null);
+ } catch (URISyntaxException e) {
+ throw new ZooKeeperHiveClientException(e);
+ }
+ String oldServerHost = connParams.getHost();
+ int oldServerPort = connParams.getPort();
+ String newServerHost = serverUri.getHost();
+ int newServerPort = serverUri.getPort();
+ connParams.setHost(newServerHost);
+ connParams.setPort(newServerPort);
+ connParams.setJdbcUriString(connParams.getJdbcUriString().replace(
+ oldServerHost + ":" + oldServerPort, newServerHost + ":" + newServerPort));
+ }
+
+ private static String joinStringArray(String[] stringArray, String seperator) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int cur = 0, end = stringArray.length; cur < end; cur++) {
+ if (cur > 0) {
+ stringBuilder.append(seperator);
+ }
+ stringBuilder.append(stringArray[cur]);
+ }
+ return stringBuilder.toString();
+ }
+
+ /**
* Takes a version string delimited by '.' and '-' characters
* and returns a partial version.
*
diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java
new file mode 100644
index 0000000..186c676
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.hive.jdbc;
+
+public class ZooKeeperHiveClientException extends Exception {
+
+ private static final long serialVersionUID = 0;
+
+ /**
+ * @param cause (original exception)
+ */
+ public ZooKeeperHiveClientException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * @param msg (exception message)
+ */
+ public ZooKeeperHiveClientException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * @param msg (exception message)
+ * @param cause (original exception)
+ */
+ public ZooKeeperHiveClientException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+}
diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
new file mode 100644
index 0000000..06795a5
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import java.nio.charset.Charset;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class ZooKeeperHiveClientHelper {
+ public static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName());
+
+ /**
+ * A no-op watcher class
+ */
+ public static class DummyWatcher implements Watcher {
+ public void process(org.apache.zookeeper.WatchedEvent event) {
+ }
+ }
+
+ /**
+ * Resolve to a host:port by connecting to ZooKeeper and picking a host randomly.
+ *
+ * @param uri
+ * @param connParams
+ * @return
+ * @throws SQLException
+ */
+ static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams)
+ throws ZooKeeperHiveClientException {
+ String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
+ String zooKeeperNamespace =
+ connParams.getSessionVars().get(JdbcConnectionParams.ZOOKEEPER_NAMESPACE);
+ List serverHosts;
+ Random randomizer = new Random();
+ String serverNode;
+ // Pick a random HiveServer2 host from the ZooKeeper namspace
+ try {
+ ZooKeeper zooKeeperClient =
+ new ZooKeeper(zooKeeperEnsemble, JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT,
+ new ZooKeeperHiveClientHelper.DummyWatcher());
+ // All the HiveServer2 host nodes that are in ZooKeeper currently
+ serverHosts = zooKeeperClient.getChildren("/" + zooKeeperNamespace, false);
+ // Remove the znodes we've already tried from this list
+ serverHosts.removeAll(connParams.getRejectedHostZnodePaths());
+ if (serverHosts.isEmpty()) {
+ throw new ZooKeeperHiveClientException(
+ "Tried all existing HiveServer2 uris from ZooKeeper.");
+ }
+ // Now pick a host randomly
+ serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size()));
+ connParams.setCurrentHostZnodePath(serverNode);
+ // Read the value from the node (UTF-8 enoded byte array) and convert it to a String
+ String serverUri =
+ new String(zooKeeperClient.getData("/" + zooKeeperNamespace + "/" + serverNode, false,
+ null), Charset.forName("UTF-8"));
+ LOG.info("Selected HiveServer2 instance with uri: " + serverUri);
+ return serverUri;
+ } catch (Exception e) {
+ throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper", e);
+ }
+ }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
index 0919d2f..1334a91 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.metadata.*;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException;
@@ -73,31 +74,6 @@ public ZooKeeperHiveLockManager() {
}
/**
- * @param conf The hive configuration
- * Get the quorum server address from the configuration. The format is:
- * host1:port, host2:port..
- **/
- @VisibleForTesting
- static String getQuorumServers(HiveConf conf) {
- String[] hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM).split(",");
- String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT);
- StringBuilder quorum = new StringBuilder();
- for(int i=0; i locks = getLocks(conf, zkpClient, null, parent, false, false);
@@ -629,7 +605,8 @@ public static void releaseAllLocks(HiveConf conf) throws Exception {
if (fetchData) {
try {
- data = new HiveLockObjectData(new String(zkpClient.getData(curChild, new DummyWatcher(), null)));
+ data = new HiveLockObjectData(new String(zkpClient.getData(curChild,
+ new ZooKeeperHiveHelper.DummyWatcher(), null)));
data.setClientIp(clientIp);
} catch (Exception e) {
LOG.error("Error in getting data for " + curChild, e);
@@ -789,11 +766,6 @@ private static HiveLockMode getLockMode(HiveConf conf, String path) {
return null;
}
- public static class DummyWatcher implements Watcher {
- public void process(org.apache.zookeeper.WatchedEvent event) {
- }
- }
-
@Override
public void prepareRetry() throws LockException {
try {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java
new file mode 100644
index 0000000..989f745
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.zookeeper.Watcher;
+
+public class ZooKeeperHiveHelper {
+
+ /**
+ * Get the ensemble server addresses from the configuration. The format is: host1:port,
+ * host2:port..
+ *
+ * @param conf
+ **/
+ public static String getQuorumServers(HiveConf conf) {
+ String[] hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM).split(",");
+ String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT);
+ StringBuilder quorum = new StringBuilder();
+ for (int i = 0; i < hosts.length; i++) {
+ quorum.append(hosts[i].trim());
+ if (!hosts[i].contains(":")) {
+ // if the hostname doesn't contain a port, add the configured port to hostname
+ quorum.append(":");
+ quorum.append(port);
+ }
+
+ if (i != hosts.length - 1)
+ quorum.append(",");
+ }
+
+ return quorum.toString();
+ }
+
+ /**
+ * A no-op watcher class
+ */
+ public static class DummyWatcher implements Watcher {
+ public void process(org.apache.zookeeper.WatchedEvent event) {
+ }
+ }
+
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java
index 59294b1..aacb73f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Assert;
@@ -87,14 +88,14 @@ public void testDeleteWithChildren() throws Exception {
public void testGetQuorumServers() {
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1");
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999");
- Assert.assertEquals("node1:9999", ZooKeeperHiveLockManager.getQuorumServers(conf));
+ Assert.assertEquals("node1:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1,node2,node3");
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999");
- Assert.assertEquals("node1:9999,node2:9999,node3:9999", ZooKeeperHiveLockManager.getQuorumServers(conf));
+ Assert.assertEquals("node1:9999,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1:5666,node2,node3");
conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999");
- Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveLockManager.getQuorumServers(conf));
+ Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf));
}
}
diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java
index a0bc905..9afd6f0 100644
--- a/service/src/java/org/apache/hive/service/cli/CLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/CLIService.java
@@ -66,7 +66,7 @@
private UserGroupInformation httpUGI;
public CLIService() {
- super("CLIService");
+ super(CLIService.class.getSimpleName());
}
@Override
diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index f5a8f27..a57b6e5 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -47,7 +47,7 @@
new HashMap();
public OperationManager() {
- super("OperationManager");
+ super(OperationManager.class.getSimpleName());
}
@Override
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index b0bb8be..5231d5e 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -166,8 +166,8 @@ private void processGlobalInitFile() {
IHiveFileProcessor processor = new GlobalHivercFileProcessor();
try {
- if (hiveConf.getVar(ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION) != null) {
- String hiverc = hiveConf.getVar(ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION)
+ if (hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION) != null) {
+ String hiverc = hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION)
+ File.separator + SessionManager.HIVERCFILE;
if (new File(hiverc).exists()) {
LOG.info("Running global init file: " + hiverc);
diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index 11d25cc..a96ce1b 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -67,7 +67,7 @@
private volatile boolean shutdown;
public SessionManager() {
- super("SessionManager");
+ super(SessionManager.class.getSimpleName());
}
@Override
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index 2b80adc..028d55e 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -18,7 +18,6 @@
package org.apache.hive.service.cli.thrift;
-import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -40,72 +39,54 @@
public class ThriftBinaryCLIService extends ThriftCLIService {
public ThriftBinaryCLIService(CLIService cliService) {
- super(cliService, "ThriftBinaryCLIService");
+ super(cliService, ThriftBinaryCLIService.class.getSimpleName());
}
@Override
public void run() {
try {
- hiveAuthFactory = new HiveAuthFactory(hiveConf);
- TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
- TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
-
- String portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
- if (portString != null) {
- portNum = Integer.valueOf(portString);
- } else {
- portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
- }
-
- String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
- if (hiveHost == null) {
- hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
- }
-
- if (hiveHost != null && !hiveHost.isEmpty()) {
- serverAddress = new InetSocketAddress(hiveHost, portNum);
- } else {
- serverAddress = new InetSocketAddress(portNum);
- }
-
- minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
- maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
- workerKeepAliveTime = hiveConf.getTimeVar(
- ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+ // Server thread pool
String threadPoolName = "HiveServer2-Handler-Pool";
ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(),
new ThreadFactoryWithGarbageCleanup(threadPoolName));
+ // Thrift configs
+ hiveAuthFactory = new HiveAuthFactory(hiveConf);
+ TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
+ TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
TServerSocket serverSocket = null;
if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum);
} else {
String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
if (keyStorePath.isEmpty()) {
- throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname +
- " Not configured for SSL connection");
+ throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+ + " Not configured for SSL connection");
}
String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
- serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum,
- keyStorePath, keyStorePassword);
+ serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath,
+ keyStorePassword);
}
+
+ // Server args
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
- .processorFactory(processorFactory)
- .transportFactory(transportFactory)
- .protocolFactory(new TBinaryProtocol.Factory())
- .executorService(executorService);
+ .processorFactory(processorFactory).transportFactory(transportFactory)
+ .protocolFactory(new TBinaryProtocol.Factory()).executorService(executorService);
+ // TCP Server
server = new TThreadPoolServer(sargs);
-
- LOG.info("ThriftBinaryCLIService listening on " + serverAddress);
-
server.serve();
-
+ String msg = "Started " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+ + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
+ LOG.info(msg);
} catch (Throwable t) {
- LOG.error("Error: ", t);
+ LOG.fatal(
+ "Error starting HiveServer2: could not start "
+ + ThriftBinaryCLIService.class.getSimpleName(), t);
+ System.exit(-1);
}
-
}
+
}
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 443c371..71da1d8 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.security.auth.login.LoginException;
@@ -34,6 +35,7 @@
import org.apache.hive.service.auth.TSetIpAddressProcessor;
import org.apache.hive.service.cli.*;
import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.server.HiveServer2;
import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
@@ -51,6 +53,7 @@
protected int portNum;
protected InetSocketAddress serverAddress;
+ protected String hiveHost;
protected TServer server;
protected org.eclipse.jetty.server.Server httpServer;
@@ -73,6 +76,43 @@ public ThriftCLIService(CLIService cliService, String serviceName) {
@Override
public synchronized void init(HiveConf hiveConf) {
this.hiveConf = hiveConf;
+
+ // Initialize common server configs needed in both binary & http modes
+ String portString;
+ hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+ if (hiveHost == null) {
+ hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
+ }
+ // HTTP mode
+ if (HiveServer2.isHTTPTransportMode(hiveConf)) {
+ workerKeepAliveTime =
+ hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME,
+ TimeUnit.SECONDS);
+ portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
+ if (portString != null) {
+ portNum = Integer.valueOf(portString);
+ } else {
+ portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
+ }
+ }
+ // Binary mode
+ else {
+ workerKeepAliveTime =
+ hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+ portString = System.getenv("HIVE_SERVER2_THRIFT_PORT");
+ if (portString != null) {
+ portNum = Integer.valueOf(portString);
+ } else {
+ portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
+ }
+ }
+ if (hiveHost != null && !hiveHost.isEmpty()) {
+ serverAddress = new InetSocketAddress(hiveHost, portNum);
+ } else {
+ serverAddress = new InetSocketAddress(portNum);
+ }
+ minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
+ maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
super.init(hiveConf);
}
@@ -105,6 +145,14 @@ public synchronized void stop() {
super.stop();
}
+ public int getPortNumber() {
+ return portNum;
+ }
+
+ public InetSocketAddress getServerAddress() {
+ return serverAddress;
+ }
+
@Override
public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req)
throws TException {
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
index 4067106..afea5b5 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
@@ -48,100 +48,94 @@
public class ThriftHttpCLIService extends ThriftCLIService {
public ThriftHttpCLIService(CLIService cliService) {
- super(cliService, "ThriftHttpCLIService");
+ super(cliService, ThriftHttpCLIService.class.getSimpleName());
}
+ /**
+ * Configure Jetty to serve http requests. Example of a client connection URL:
+ * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ,
+ * e.g. http://gateway:port/hive2/servlets/thrifths2/
+ */
@Override
public void run() {
try {
- // Configure Jetty to serve http requests
- // Example of a client connection URL: http://localhost:10000/servlets/thrifths2/
- // a gateway may cause actual target URL to differ, e.g. http://gateway:port/hive2/servlets/thrifths2/
-
+ // Verify config validity
verifyHttpConfiguration(hiveConf);
- String portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT");
- if (portString != null) {
- portNum = Integer.valueOf(portString);
- } else {
- portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT);
- }
-
- minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
- maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
- workerKeepAliveTime = hiveConf.getTimeVar(
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
-
- String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
+ // HTTP Server
+ httpServer = new org.eclipse.jetty.server.Server(serverAddress);
- httpServer = new org.eclipse.jetty.server.Server();
+ // Server thread pool
String threadPoolName = "HiveServer2-HttpHandler-Pool";
ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(),
new ThreadFactoryWithGarbageCleanup(threadPoolName));
-
ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
httpServer.setThreadPool(threadPool);
- SelectChannelConnector connector = new SelectChannelConnector();;
+ // Connector configs
+ SelectChannelConnector connector = new SelectChannelConnector();
boolean useSsl = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL);
String schemeName = useSsl ? "https" : "http";
- String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
- // Set during the init phase of HiveServer2 if auth mode is kerberos
- // UGI for the hive/_HOST (kerberos) principal
- UserGroupInformation serviceUGI = cliService.getServiceUGI();
- // UGI for the http/_HOST (SPNego) principal
- UserGroupInformation httpUGI = cliService.getHttpUGI();
-
+ // Change connector if SSL is used
if (useSsl) {
String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
if (keyStorePath.isEmpty()) {
- throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname +
- " Not configured for SSL connection");
+ throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+ + " Not configured for SSL connection");
}
SslContextFactory sslContextFactory = new SslContextFactory();
sslContextFactory.setKeyStorePath(keyStorePath);
sslContextFactory.setKeyStorePassword(keyStorePassword);
connector = new SslSelectChannelConnector(sslContextFactory);
}
-
connector.setPort(portNum);
// Linux:yes, Windows:no
connector.setReuseAddress(!Shell.WINDOWS);
-
- int maxIdleTime = (int) hiveConf.getTimeVar(
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS);
+ int maxIdleTime = (int) hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME,
+ TimeUnit.MILLISECONDS);
connector.setMaxIdleTime(maxIdleTime);
-
+
httpServer.addConnector(connector);
+ // Thrift configs
hiveAuthFactory = new HiveAuthFactory(hiveConf);
TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
TProcessor processor = processorFactory.getProcessor(null);
-
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
+ // Set during the init phase of HiveServer2 if auth mode is kerberos
+ // UGI for the hive/_HOST (kerberos) principal
+ UserGroupInformation serviceUGI = cliService.getServiceUGI();
+ // UGI for the http/_HOST (SPNego) principal
+ UserGroupInformation httpUGI = cliService.getHttpUGI();
+ String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION);
+ TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType,
+ serviceUGI, httpUGI);
- TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory,
- authType, serviceUGI, httpUGI);
-
+ // Context handler
final ServletContextHandler context = new ServletContextHandler(
ServletContextHandler.SESSIONS);
context.setContextPath("/");
-
+ String httpPath = getHttpPath(hiveConf
+ .getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
httpServer.setHandler(context);
context.addServlet(new ServletHolder(thriftHttpServlet), httpPath);
// TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc.
+ // Finally, start the server
httpServer.start();
- String msg = "Started ThriftHttpCLIService in " + schemeName + " mode on port " + portNum +
- " path=" + httpPath +
- " with " + minWorkerThreads + ".." + maxWorkerThreads + " worker threads";
+ String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName
+ + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
+ + maxWorkerThreads + " worker threads";
LOG.info(msg);
httpServer.join();
} catch (Throwable t) {
- LOG.error("Error: ", t);
+ LOG.fatal(
+ "Error starting HiveServer2: could not start "
+ + ThriftHttpCLIService.class.getSimpleName(), t);
+ System.exit(-1);
}
}
@@ -191,7 +185,8 @@ private static void verifyHttpConfiguration(HiveConf hiveConf) {
// NONE in case of thrift mode uses SASL
LOG.warn(ConfVars.HIVE_SERVER2_AUTHENTICATION + " setting to " +
authType + ". SASL is not supported with http transport mode," +
- " so using equivalent of " + AuthTypes.NOSASL);
+ " so using equivalent of "
+ + AuthTypes.NOSASL);
}
}
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 124996c..a9f4ee1 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -18,6 +18,8 @@
package org.apache.hive.service.server;
+import java.nio.charset.Charset;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.LogUtils;
@@ -25,12 +27,17 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
/**
* HiveServer2.
@@ -41,9 +48,11 @@
private CLIService cliService;
private ThriftCLIService thriftCLIService;
+ private String znodePath;
+ private ZooKeeper zooKeeperClient;
public HiveServer2() {
- super("HiveServer2");
+ super(HiveServer2.class.getSimpleName());
HiveConf.setLoadHiveServer2Config(true);
}
@@ -52,20 +61,81 @@ public HiveServer2() {
public synchronized void init(HiveConf hiveConf) {
cliService = new CLIService();
addService(cliService);
+ if (isHTTPTransportMode(hiveConf)) {
+ thriftCLIService = new ThriftHttpCLIService(cliService);
+ } else {
+ thriftCLIService = new ThriftBinaryCLIService(cliService);
+ }
+ addService(thriftCLIService);
+ super.init(hiveConf);
+ // Add a shutdown hook for catching SIGTERM & SIGINT
+ final HiveServer2 hiveServer2 = this;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ hiveServer2.stop();
+ }
+ });
+ }
+
+ public static boolean isHTTPTransportMode(HiveConf hiveConf) {
String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE");
- if(transportMode == null) {
+ if (transportMode == null) {
transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE);
}
- if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
- thriftCLIService = new ThriftHttpCLIService(cliService);
+ if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) {
+ return true;
}
- else {
- thriftCLIService = new ThriftBinaryCLIService(cliService);
+ return false;
+ }
+
+ /**
+ * @param hiveConf
+ * @throws Exception
+ */
+ private void addServerInstanceForDiscovery(HiveConf hiveConf) throws Exception {
+ int zooKeeperSessionTimeout =
+ hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
+ String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
+ String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+ String instanceURI = getServerInstanceURI(hiveConf);
+ byte[] znodeDataUTF8 = getServerInstanceURI(hiveConf).getBytes(Charset.forName("UTF-8"));
+ zooKeeperClient =
+ new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout,
+ new ZooKeeperHiveHelper.DummyWatcher());
+
+ // Create the parent znode; ignore if the parent already exists
+ try {
+ zooKeeperClient.create("/" + rootNamespace, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ LOG.info("Created the root name space:" + rootNamespace + " on ZooKeeper for HiveServer2");
+ } catch (KeeperException e) {
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ LOG.fatal("Unable to create HiveServer2 namespace: /" + rootNamespace + "on ZooKeeper", e);
+ throw (e);
+ }
}
+ // Create a znode under the rootNamespace parent for this instance of the server
+ try {
+ znodePath =
+ zooKeeperClient.create("/" + rootNamespace + "/server", znodeDataUTF8,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
+ } catch (KeeperException e) {
+ LOG.fatal("Unable to create a znode for this server instance", e);
+ throw new Exception(e);
+ }
+ }
- addService(thriftCLIService);
- super.init(hiveConf);
+ private void removeServerInstanceFromDiscovery() throws Exception {
+ zooKeeperClient.close();
+ LOG.info("Server instance removed from ZooKeeper");
+ }
+
+ private String getServerInstanceURI(HiveConf hiveConf) {
+ return thriftCLIService.getServerAddress().getHostName() + ":"
+ + thriftCLIService.getPortNumber();
}
@Override
@@ -75,23 +145,32 @@ public synchronized void start() {
@Override
public synchronized void stop() {
- super.stop();
- // there should already be an instance of the session pool manager.
- // if not, ignoring is fine while stopping the hive server.
+ LOG.info("Shutting down HiveServer2");
HiveConf hiveConf = this.getHiveConf();
+ super.stop();
+ // Remove this server instance from ZooKeeper if dynamic service discovery is set
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+ try {
+ removeServerInstanceFromDiscovery();
+ } catch (Exception e) {
+ LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
+ }
+ }
+ // There should already be an instance of the session pool manager.
+ // If not, ignoring is fine while stopping HiveServer2.
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
try {
TezSessionPoolManager.getInstance().stop();
} catch (Exception e) {
- LOG.error("Tez session pool manager stop had an error during stop of hive server");
- e.printStackTrace();
+ LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. "
+ + "Shutting down HiveServer2 anyway.", e);
}
}
}
private static void startHiveServer2() throws Throwable {
long attempts = 0, maxAttempts = 1;
- while(true) {
+ while (true) {
HiveConf hiveConf = new HiveConf();
maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS);
HiveServer2 server = null;
@@ -99,6 +178,11 @@ private static void startHiveServer2() throws Throwable {
server = new HiveServer2();
server.init(hiveConf);
server.start();
+ // If we're supporting dynamic service discovery, we'll add the service uri for this
+ // HiveServer2 instance to Zookeeper as a znode.
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
+ server.addServerInstanceForDiscovery(hiveConf);
+ }
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) {
TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance();
sessionPool.setupPool(hiveConf);
@@ -106,19 +190,19 @@ private static void startHiveServer2() throws Throwable {
}
break;
} catch (Throwable throwable) {
- if(++attempts >= maxAttempts) {
+ if (++attempts >= maxAttempts) {
throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable);
} else {
- LOG.warn("Error starting HiveServer2 on attempt " + attempts +
- ", will retry in 60 seconds", throwable);
+ LOG.warn("Error starting HiveServer2 on attempt " + attempts
+ + ", will retry in 60 seconds", throwable);
try {
if (server != null) {
server.stop();
server = null;
}
} catch (Exception e) {
- LOG.info("Exception caught when calling stop of HiveServer2 before" +
- " retrying start", e);
+ LOG.info(
+ "Exception caught when calling stop of HiveServer2 before" + " retrying start", e);
}
try {
Thread.sleep(60L * 1000L);
@@ -139,14 +223,15 @@ public static void main(String[] args) {
System.exit(-1);
}
- //NOTE: It is critical to do this here so that log4j is reinitialized
+ // NOTE: It is critical to do this here so that log4j is reinitialized
// before any of the other core hive classes are loaded
String initLog4jMessage = LogUtils.initHiveLog4j();
LOG.debug(initLog4jMessage);
HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG);
- //log debug message from "oproc" after log4j initialize properly
+ // log debug message from "oproc" after log4j initialize properly
LOG.debug(oproc.getDebugMessage().toString());
+
startHiveServer2();
} catch (LogInitializationException e) {
LOG.error("Error initializing log: " + e.getMessage(), e);
diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
index 66fc1fc..5b1cbc0 100644
--- a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
+++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java
@@ -27,7 +27,11 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.service.cli.*;
+import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.cli.ICLIService;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient;
import org.junit.After;
@@ -83,7 +87,7 @@ public void setUp() throws Exception {
// set up service and client
HiveConf hiveConf = new HiveConf();
- hiveConf.setVar(HiveConf.ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION,
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION,
initFile.getParentFile().getAbsolutePath());
service = new FakeEmbeddedThriftBinaryCLIService(hiveConf);
service.init(new HiveConf());