diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index cfb3f9a..185ea95 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -205,7 +205,7 @@ PLAN_SERIALIZATION("hive.plan.serialization.format", "kryo", "Query plan format serialization between client and task nodes. \n" + "Two supported values are : kryo and javaXML. Kryo is default."), - SCRATCHDIR("hive.exec.scratchdir", "/tmp/hive", + SCRATCHDIR("hive.exec.scratchdir", "/tmp/hive", "HDFS root scratch dir for Hive jobs which gets created with 777 permission. " + "For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/ is created, " + "with ${hive.scratch.dir.permission}."), @@ -215,7 +215,7 @@ DOWNLOADED_RESOURCES_DIR("hive.downloaded.resources.dir", "${system:java.io.tmpdir}" + File.separator + "${hive.session.id}_resources", "Temporary local directory for added resources in the remote file system."), - SCRATCHDIRPERMISSION("hive.scratch.dir.permission", "700", + SCRATCHDIRPERMISSION("hive.scratch.dir.permission", "700", "The permission for the user specific scratch directories that get created."), SUBMITVIACHILD("hive.exec.submitviachild", false, ""), SUBMITLOCALTASKVIACHILD("hive.exec.submit.local.task.via.child", true, @@ -1243,10 +1243,16 @@ "This param is to control whether or not only do lock on queries\n" + "that need to execute at least one mapred job."), + // Zookeeper related configs HIVE_ZOOKEEPER_QUORUM("hive.zookeeper.quorum", "", - "The list of ZooKeeper servers to talk to. This is only needed for read/write locks."), + "List of ZooKeeper servers to talk to. This is needed for: " + + "1. Read/write locks - when hive.lock.manager is set to " + + "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager, " + + "2. When HiveServer2 supports service discovery via Zookeeper."), HIVE_ZOOKEEPER_CLIENT_PORT("hive.zookeeper.client.port", "2181", - "The port of ZooKeeper servers to talk to. This is only needed for read/write locks."), + "The port of ZooKeeper servers to talk to. " + + "If the list of Zookeeper servers specified in hive.zookeeper.quorum," + + "does not contain port numbers, this value is used."), HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 600*1000, "ZooKeeper client's session timeout. The client is disconnected, and as a result, all locks released, \n" + "if a heartbeat is not sent in the timeout."), @@ -1446,11 +1452,6 @@ "If the property is set, the value must be a valid URI (java.net.URI, e.g. \"file:///tmp/my-logging.properties\"), \n" + "which you can then extract a URL from and pass to PropertyConfigurator.configure(URL)."), - // Hive global init file location - HIVE_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}", - "The location of HS2 global init file (.hiverc).\n" + - "If the property is reset, the value must be a valid path where the init file is located."), - // prefix used to auto generated column aliases (this should be started with '_') HIVE_AUTOGEN_COLUMNALIAS_PREFIX_LABEL("hive.autogen.columnalias.prefix.label", "_c", "String used as a prefix when auto generating column alias.\n" + @@ -1489,16 +1490,29 @@ "table. From 0.12 onwards, they are displayed separately. This flag will let you\n" + "get old behavior, if desired. See, test-case in patch for HIVE-6689."), + // HiveServer2 specific configs HIVE_SERVER2_MAX_START_ATTEMPTS("hive.server2.max.start.attempts", 30L, new RangeValidator(0L, null), - "This number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds between retries. \n" + - "The default of 30 will keep trying for 30 minutes."), - + "Number of times HiveServer2 will attempt to start before exiting, sleeping 60 seconds " + + "between retries. \n The default of 30 will keep trying for 30 minutes."), + HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY("hive.server2.support.dynamic.service.discovery", false, + "Whether HiveServer2 supports dynamic service discovery for its clients. " + + "To support this, each instance of HiveServer2 currently uses ZooKeeper to register itself, " + + "when it is brought up. JDBC/ODBC clients should use the ZooKeeper ensemble: " + + "hive.zookeeper.quorum in their connection string."), + HIVE_SERVER2_ZOOKEEPER_NAMESPACE("hive.server2.zookeeper.namespace", "hiveserver2", + "The parent node in ZooKeeper used by HiveServer2 when supporting dynamic service discovery."), + // HiveServer2 global init file location + HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION("hive.server2.global.init.file.location", "${env:HIVE_CONF_DIR}", + "The location of HS2 global init file (.hiverc).\n" + + "If the property is reset, the value must be a valid path where the init file is located."), HIVE_SERVER2_TRANSPORT_MODE("hive.server2.transport.mode", "binary", new StringSet("binary", "http"), "Transport mode of HiveServer2."), + HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "", + "Bind host on which to run the HiveServer2 Thrift service."), // http (over thrift) transport settings HIVE_SERVER2_THRIFT_HTTP_PORT("hive.server2.thrift.http.port", 10001, - "Port number when in HTTP mode."), + "Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'http'."), HIVE_SERVER2_THRIFT_HTTP_PATH("hive.server2.thrift.http.path", "cliservice", "Path component of URL endpoint when in HTTP mode."), HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS("hive.server2.thrift.http.min.worker.threads", 5, @@ -1515,11 +1529,7 @@ // binary transport settings HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000, - "Port number of HiveServer2 Thrift interface.\n" + - "Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT"), - HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", "", - "Bind host on which to run the HiveServer2 Thrift interface.\n" + - "Can be overridden by setting $HIVE_SERVER2_THRIFT_BIND_HOST"), + "Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'binary'."), // hadoop.rpc.protection being set to a higher level than HiveServer2 // does not make sense in most situations. // HiveServer2 ignores hadoop.rpc.protection in favor of hive.server2.thrift.sasl.qop. diff --git a/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java b/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java index e1d44ec..1e66542 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java @@ -210,7 +210,7 @@ private void testScriptFile(String testName, String scriptText, String expectedP } scriptFile.delete(); } - + /** * Test that BeeLine will read comment lines that start with whitespace * @throws Throwable diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index ae128a9..daaa71f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -262,10 +262,9 @@ public void testBadURL() throws Exception { private void checkBadUrl(String url) throws SQLException { try{ DriverManager.getConnection(url, "", ""); - fail("should have thrown IllegalArgumentException but did not "); - } catch(SQLException i) { - assertTrue(i.getMessage().contains("Bad URL format. Hostname not found " - + " in authority part of the url")); + fail("Should have thrown JdbcUriParseException but did not "); + } catch(JdbcUriParseException e) { + assertTrue(e.getMessage().contains("Bad URL format")); } } @@ -1618,6 +1617,10 @@ public void testResultSetMetaData() throws SQLException { // [url] [host] [port] [db] private static final String[][] URL_PROPERTIES = new String[][] { // binary mode + // For embedded mode, the JDBC uri is of the form: + // jdbc:hive2:///dbName;sess_var_list?hive_conf_list#hive_var_list + // and does not contain host:port string. + // As a result port is parsed to '-1' per the Java URI conventions {"jdbc:hive2://", "", "", "default"}, {"jdbc:hive2://localhost:10001/default", "localhost", "10001", "default"}, {"jdbc:hive2://localhost/notdefault", "localhost", "10000", "notdefault"}, @@ -1654,7 +1657,8 @@ public void testDriverProperties() throws SQLException { }; @Test - public void testParseUrlHttpMode() throws SQLException { + public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException, + ZooKeeperHiveClientException { new HiveDriver(); for (String[] testValues : HTTP_URL_PROPERTIES) { JdbcConnectionParams params = Utils.parseURL(testValues[0]); diff --git a/jdbc/pom.xml b/jdbc/pom.xml index 1ad13a7..0132f12 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -80,6 +80,17 @@ libthrift ${libthrift.version} + + org.apache.zookeeper + zookeeper + ${zookeeper.version} + + + org.jboss.netty + netty + + + diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index cbcfec7..e0d2d6d 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -53,6 +53,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.KerberosSaslHelper; import org.apache.hive.service.auth.PlainSaslHelper; @@ -86,37 +87,20 @@ */ public class HiveConnection implements java.sql.Connection { public static final Log LOG = LogFactory.getLog(HiveConnection.class.getName()); - private static final String HIVE_AUTH_TYPE= "auth"; - private static final String HIVE_AUTH_QOP = "sasl.qop"; - private static final String HIVE_AUTH_SIMPLE = "noSasl"; - private static final String HIVE_AUTH_TOKEN = "delegationToken"; - private static final String HIVE_AUTH_USER = "user"; - private static final String HIVE_AUTH_PRINCIPAL = "principal"; - private static final String HIVE_AUTH_PASSWD = "password"; - private static final String HIVE_AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType"; - private static final String HIVE_AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject"; - private static final String HIVE_ANONYMOUS_USER = "anonymous"; - private static final String HIVE_ANONYMOUS_PASSWD = "anonymous"; - private static final String HIVE_USE_SSL = "ssl"; - private static final String HIVE_SSL_TRUST_STORE = "sslTrustStore"; - private static final String HIVE_SSL_TRUST_STORE_PASSWORD = "trustStorePassword"; - private static final String HIVE_SERVER2_TRANSPORT_MODE = "hive.server2.transport.mode"; - private static final String HIVE_SERVER2_THRIFT_HTTP_PATH = "hive.server2.thrift.http.path"; private static final String HIVE_VAR_PREFIX = "hivevar:"; private static final String HIVE_CONF_PREFIX = "hiveconf:"; - // Currently supports JKS keystore format - // See HIVE-6286 (Add support for PKCS12 keystore format) - private static final String HIVE_SSL_TRUST_STORE_TYPE = "JKS"; - private final String jdbcURI; - private final String host; - private final int port; + private String jdbcUriString; + private String host; + private int port; private final Map sessConfMap; private final Map hiveConfMap; private final Map hiveVarMap; + private JdbcConnectionParams connParams; private final boolean isEmbeddedMode; private TTransport transport; - private TCLIService.Iface client; // todo should be replaced by CliServiceClient + // TODO should be replaced by CliServiceClient + private TCLIService.Iface client; private boolean isClosed = true; private SQLWarning warningChain = null; private TSessionHandle sessHandle = null; @@ -126,14 +110,12 @@ public HiveConnection(String uri, Properties info) throws SQLException { setupLoginTimeout(); - jdbcURI = uri; - // parse the connection uri - Utils.JdbcConnectionParams connParams; try { connParams = Utils.parseURL(uri); - } catch (IllegalArgumentException e) { + } catch (ZooKeeperHiveClientException e) { throw new SQLException(e); } + jdbcUriString = connParams.getJdbcUriString(); // extract parsed connection parameters: // JDBC URL: jdbc:hive2://:/dbName;sess_var_list?hive_conf_list#hive_var_list // each list: =;= and so on @@ -164,14 +146,14 @@ public HiveConnection(String uri, Properties info) throws SQLException { } else { // extract user/password from JDBC connection properties if its not supplied in the // connection URL - if (info.containsKey(HIVE_AUTH_USER)) { - sessConfMap.put(HIVE_AUTH_USER, info.getProperty(HIVE_AUTH_USER)); - if (info.containsKey(HIVE_AUTH_PASSWD)) { - sessConfMap.put(HIVE_AUTH_PASSWD, info.getProperty(HIVE_AUTH_PASSWD)); + if (info.containsKey(JdbcConnectionParams.AUTH_USER)) { + sessConfMap.put(JdbcConnectionParams.AUTH_USER, info.getProperty(JdbcConnectionParams.AUTH_USER)); + if (info.containsKey(JdbcConnectionParams.AUTH_PASSWD)) { + sessConfMap.put(JdbcConnectionParams.AUTH_PASSWD, info.getProperty(JdbcConnectionParams.AUTH_PASSWD)); } } - if (info.containsKey(HIVE_AUTH_TYPE)) { - sessConfMap.put(HIVE_AUTH_TYPE, info.getProperty(HIVE_AUTH_TYPE)); + if (info.containsKey(JdbcConnectionParams.AUTH_TYPE)) { + sessConfMap.put(JdbcConnectionParams.AUTH_TYPE, info.getProperty(JdbcConnectionParams.AUTH_TYPE)); } // open the client transport openTransport(); @@ -189,19 +171,44 @@ public HiveConnection(String uri, Properties info) throws SQLException { supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V7); // open client session - openSession(connParams); + openSession(); } private void openTransport() throws SQLException { - // TODO: Refactor transport creation to a factory, it's getting uber messy here - transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); - try { - if (!transport.isOpen()) { - transport.open(); + while (true) { + try { + transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); + if (!transport.isOpen()) { + LOG.info("Will try to open client transport with JDBC Uri: " + jdbcUriString); + transport.open(); + } + break; + } catch (TTransportException e) { + LOG.info("Could not open client transport with JDBC Uri: " + jdbcUriString); + // We'll retry till we exhaust all HiveServer2 uris from ZooKeeper + if ((sessConfMap.get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE) != null) + && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER.equalsIgnoreCase(sessConfMap + .get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE)))) { + try { + // Update jdbcUriString, host & port variables in connParams + // Throw an exception if all HiveServer2 uris have been exhausted, + // or if we're unable to connect to ZooKeeper. + Utils.updateConnParamsFromZooKeeper(connParams); + } catch (ZooKeeperHiveClientException ze) { + throw new SQLException( + "Could not open client transport for any of the Server URI's in ZooKeeper: " + + ze.getMessage(), " 08S01", ze); + } + // Update with new values + jdbcUriString = connParams.getJdbcUriString(); + host = connParams.getHost(); + port = connParams.getPort(); + LOG.info("Will retry opening client transport"); + } else { + throw new SQLException("Could not open client transport with JDBC Uri: " + jdbcUriString + + ": " + e.getMessage(), " 08S01", e); + } } - } catch (TTransportException e) { - throw new SQLException("Could not open connection to " - + jdbcURI + ": " + e.getMessage(), " 08S01", e); } } @@ -211,37 +218,36 @@ private String getServerHttpUrl(boolean useSsl) { String schemeName = useSsl ? "https" : "http"; // http path should begin with "/" String httpPath; - httpPath = hiveConfMap.get(HIVE_SERVER2_THRIFT_HTTP_PATH); - if(httpPath == null) { + httpPath = hiveConfMap.get(JdbcConnectionParams.HTTP_PATH); + if (httpPath == null) { httpPath = "/"; - } - else if(!httpPath.startsWith("/")) { + } else if (!httpPath.startsWith("/")) { httpPath = "/" + httpPath; } - return schemeName + "://" + host + ":" + port + httpPath; + return schemeName + "://" + host + ":" + port + httpPath; } - private TTransport createHttpTransport() throws SQLException { + private TTransport createHttpTransport() throws SQLException, TTransportException { DefaultHttpClient httpClient; - boolean useSsl = isSslConnection(); - // Create an http client from the configs - try { - httpClient = getHttpClient(useSsl); - } catch (Exception e) { - String msg = "Could not create http connection to " + - jdbcURI + ". " + e.getMessage(); - throw new SQLException(msg, " 08S01", e); - } - + httpClient = getHttpClient(useSsl); try { transport = new THttpClient(getServerHttpUrl(useSsl), httpClient); + // We'll call an open/close here to send a test HTTP message to the server. Any + // TTransportException caused by trying to connect to a non-available peer are thrown here. + // Bubbling them up the call hierarchy so that a retry can happen in openTransport, + // if dynamic service discovery is configured. + TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport)); + TOpenSessionResp openResp = client.OpenSession(new TOpenSessionReq()); + if (openResp != null) { + client.CloseSession(new TCloseSessionReq(openResp.getSessionHandle())); + } } - catch (TTransportException e) { + catch (TException e) { String msg = "Could not create http connection to " + - jdbcURI + ". " + e.getMessage(); - throw new SQLException(msg, " 08S01", e); + jdbcUriString + ". " + e.getMessage(); + throw new TTransportException(msg, e); } return transport; } @@ -263,7 +269,7 @@ private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException { * for sending to the server before every request. */ requestInterceptor = new HttpKerberosRequestInterceptor( - sessConfMap.get(HIVE_AUTH_PRINCIPAL), host, getServerHttpUrl(false)); + sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host, getServerHttpUrl(false)); } else { /** @@ -273,11 +279,23 @@ private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException { requestInterceptor = new HttpBasicAuthInterceptor(getUserName(), getPassword()); // Configure httpClient for SSL if (useSsl) { - String sslTrustStorePath = sessConfMap.get(HIVE_SSL_TRUST_STORE); + String sslTrustStorePath = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE); String sslTrustStorePassword = sessConfMap.get( - HIVE_SSL_TRUST_STORE_PASSWORD); + JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD); KeyStore sslTrustStore; SSLSocketFactory socketFactory; + /** + * The code within the try block throws: + * 1. SSLInitializationException + * 2. KeyStoreException + * 3. IOException + * 4. NoSuchAlgorithmException + * 5. CertificateException + * 6. KeyManagementException + * 7. UnrecoverableKeyException + * We don't want the client to retry on any of these, hence we catch all + * and throw a SQLException. + */ try { if (sslTrustStorePath == null || sslTrustStorePath.isEmpty()) { // Create a default socket factory based on standard JSSE trust material @@ -285,7 +303,7 @@ private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException { } else { // Pick trust store config from the given path - sslTrustStore = KeyStore.getInstance(HIVE_SSL_TRUST_STORE_TYPE); + sslTrustStore = KeyStore.getInstance(JdbcConnectionParams.SSL_TRUST_STORE_TYPE); sslTrustStore.load(new FileInputStream(sslTrustStorePath), sslTrustStorePassword.toCharArray()); socketFactory = new SSLSocketFactory(sslTrustStore); @@ -296,7 +314,7 @@ private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException { } catch (Exception e) { String msg = "Could not create an https connection to " + - jdbcURI + ". " + e.getMessage(); + jdbcUriString + ". " + e.getMessage(); throw new SQLException(msg, " 08S01", e); } } @@ -316,29 +334,32 @@ private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException { * - Raw (non-SASL) socket * * Kerberos and Delegation token supports SASL QOP configurations + * @throws SQLException, TTransportException */ - private TTransport createBinaryTransport() throws SQLException { + private TTransport createBinaryTransport() throws SQLException, TTransportException { try { // handle secure connection if specified - if (!HIVE_AUTH_SIMPLE.equals(sessConfMap.get(HIVE_AUTH_TYPE))) { + if (!JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE))) { // If Kerberos Map saslProps = new HashMap(); SaslQOP saslQOP = SaslQOP.AUTH; - if (sessConfMap.containsKey(HIVE_AUTH_PRINCIPAL)) { - if (sessConfMap.containsKey(HIVE_AUTH_QOP)) { + if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL)) { + if (sessConfMap.containsKey(JdbcConnectionParams.AUTH_QOP)) { try { - saslQOP = SaslQOP.fromString(sessConfMap.get(HIVE_AUTH_QOP)); + saslQOP = SaslQOP.fromString(sessConfMap.get(JdbcConnectionParams.AUTH_QOP)); } catch (IllegalArgumentException e) { - throw new SQLException("Invalid " + HIVE_AUTH_QOP + + throw new SQLException("Invalid " + JdbcConnectionParams.AUTH_QOP + " parameter. " + e.getMessage(), "42000", e); } } saslProps.put(Sasl.QOP, saslQOP.toString()); saslProps.put(Sasl.SERVER_AUTH, "true"); - boolean assumeSubject = HIVE_AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap.get(HIVE_AUTH_KERBEROS_AUTH_TYPE)); + boolean assumeSubject = JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap + .get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE)); transport = KerberosSaslHelper.getKerberosTransport( - sessConfMap.get(HIVE_AUTH_PRINCIPAL), host, - HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps, assumeSubject); + sessConfMap.get(JdbcConnectionParams.AUTH_PRINCIPAL), host, + HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps, + assumeSubject); } else { // If there's a delegation token available then use token based connection String tokenStr = getClientDelegationToken(sessConfMap); @@ -349,10 +370,15 @@ private TTransport createBinaryTransport() throws SQLException { // we are using PLAIN Sasl connection with user/password String userName = getUserName(); String passwd = getPassword(); + // Note: Thrift returns an SSL socket that is already bound to the specified host:port + // Therefore an open called on this would be a no-op later + // Hence, any TTransportException related to connecting with the peer are thrown here. + // Bubbling them up the call hierarchy so that a retry can happen in openTransport, + // if dynamic service discovery is configured. if (isSslConnection()) { // get SSL socket - String sslTrustStore = sessConfMap.get(HIVE_SSL_TRUST_STORE); - String sslTrustStorePassword = sessConfMap.get(HIVE_SSL_TRUST_STORE_PASSWORD); + String sslTrustStore = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE); + String sslTrustStorePassword = sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD); if (sslTrustStore == null || sslTrustStore.isEmpty()) { transport = HiveAuthFactory.getSSLSocket(host, port, loginTimeout); } else { @@ -373,10 +399,7 @@ private TTransport createBinaryTransport() throws SQLException { } } catch (SaslException e) { throw new SQLException("Could not create secure connection to " - + jdbcURI + ": " + e.getMessage(), " 08S01", e); - } catch (TTransportException e) { - throw new SQLException("Could not create connection to " - + jdbcURI + ": " + e.getMessage(), " 08S01", e); + + jdbcUriString + ": " + e.getMessage(), " 08S01", e); } return transport; } @@ -385,7 +408,7 @@ private TTransport createBinaryTransport() throws SQLException { private String getClientDelegationToken(Map jdbcConnConf) throws SQLException { String tokenStr = null; - if (HIVE_AUTH_TOKEN.equalsIgnoreCase(jdbcConnConf.get(HIVE_AUTH_TYPE))) { + if (JdbcConnectionParams.AUTH_TOKEN.equalsIgnoreCase(jdbcConnConf.get(JdbcConnectionParams.AUTH_TYPE))) { // check delegation token in job conf if any try { tokenStr = ShimLoader.getHadoopShims(). @@ -397,7 +420,7 @@ private String getClientDelegationToken(Map jdbcConnConf) return tokenStr; } - private void openSession(Utils.JdbcConnectionParams connParams) throws SQLException { + private void openSession() throws SQLException { TOpenSessionReq openReq = new TOpenSessionReq(); Map openConf = new HashMap(); @@ -433,7 +456,7 @@ private void openSession(Utils.JdbcConnectionParams connParams) throws SQLExcept } catch (TException e) { LOG.error("Error opening session", e); throw new SQLException("Could not establish connection to " - + jdbcURI + ": " + e.getMessage(), " 08S01", e); + + jdbcUriString + ": " + e.getMessage(), " 08S01", e); } isClosed = false; } @@ -442,27 +465,27 @@ private void openSession(Utils.JdbcConnectionParams connParams) throws SQLExcept * @return username from sessConfMap */ private String getUserName() { - return getSessionValue(HIVE_AUTH_USER, HIVE_ANONYMOUS_USER); + return getSessionValue(JdbcConnectionParams.AUTH_USER, JdbcConnectionParams.ANONYMOUS_USER); } /** * @return password from sessConfMap */ private String getPassword() { - return getSessionValue(HIVE_AUTH_PASSWD, HIVE_ANONYMOUS_PASSWD); + return getSessionValue(JdbcConnectionParams.AUTH_PASSWD, JdbcConnectionParams.ANONYMOUS_PASSWD); } private boolean isSslConnection() { - return "true".equalsIgnoreCase(sessConfMap.get(HIVE_USE_SSL)); + return "true".equalsIgnoreCase(sessConfMap.get(JdbcConnectionParams.USE_SSL)); } private boolean isKerberosAuthMode() { - return !HIVE_AUTH_SIMPLE.equals(sessConfMap.get(HIVE_AUTH_TYPE)) - && sessConfMap.containsKey(HIVE_AUTH_PRINCIPAL); + return !JdbcConnectionParams.AUTH_SIMPLE.equals(sessConfMap.get(JdbcConnectionParams.AUTH_TYPE)) + && sessConfMap.containsKey(JdbcConnectionParams.AUTH_PRINCIPAL); } private boolean isHttpTransportMode() { - String transportMode = hiveConfMap.get(HIVE_SERVER2_TRANSPORT_MODE); + String transportMode = hiveConfMap.get(JdbcConnectionParams.TRANSPORT_MODE); if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) { return true; } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java b/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java index 6e248d6..396c314 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveDriver.java @@ -230,7 +230,12 @@ private Properties parseURLforPropertyInfo(String url, Properties defaults) thro throw new SQLException("Invalid connection url: " + url); } - JdbcConnectionParams params = Utils.parseURL(url); + JdbcConnectionParams params = null; + try { + params = Utils.parseURL(url); + } catch (ZooKeeperHiveClientException e) { + throw new SQLException(e); + } String host = params.getHost(); if (host == null){ host = ""; @@ -239,7 +244,7 @@ private Properties parseURLforPropertyInfo(String url, Properties defaults) thro if(host.equals("")){ port = ""; } - else if(port.equals("0")){ + else if(port.equals("0") || port.equals("-1")){ port = Utils.DEFAULT_PORT; } String db = params.getDbName(); diff --git a/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java b/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java new file mode 100644 index 0000000..6bb2e20 --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/JdbcUriParseException.java @@ -0,0 +1,45 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hive.jdbc; + +import java.sql.SQLException; + +public class JdbcUriParseException extends SQLException { + + private static final long serialVersionUID = 0; + + /** + * @param cause (original exception) + */ + public JdbcUriParseException(Throwable cause) { + super(cause); + } + + /** + * @param msg (exception message) + */ + public JdbcUriParseException(String msg) { + super(msg); + } + + /** + * @param msg (exception message) + * @param cause (original exception) + */ + public JdbcUriParseException(String msg, Throwable cause) { + super(msg, cause); + } + +} diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java index 58339bf..e6b1a36 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -19,17 +19,23 @@ package org.apache.hive.jdbc; import java.net.URI; +import java.net.URISyntaxException; import java.sql.SQLException; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.thrift.TStatus; import org.apache.hive.service.cli.thrift.TStatusCode; public class Utils { + public static final Log LOG = LogFactory.getLog(Utils.class.getName()); /** * The required prefix for the connection URL. */ @@ -47,14 +53,58 @@ private static final String URI_JDBC_PREFIX = "jdbc:"; + private static final String URI_HIVE_PREFIX = "hive2:"; + public static class JdbcConnectionParams { + // Note on client side parameter naming convention: + // Prefer using a shorter camelCase param name instead of using the same name as the + // corresponding + // HiveServer2 config. + // For a jdbc url: jdbc:hive2://:/dbName;sess_var_list?hive_conf_list#hive_var_list, + // client side params are specified in sess_var_list + + // Client param names: + static final String AUTH_TYPE = "auth"; + static final String AUTH_QOP = "sasl.qop"; + static final String AUTH_SIMPLE = "noSasl"; + static final String AUTH_TOKEN = "delegationToken"; + static final String AUTH_USER = "user"; + static final String AUTH_PRINCIPAL = "principal"; + static final String AUTH_PASSWD = "password"; + static final String AUTH_KERBEROS_AUTH_TYPE = "kerberosAuthType"; + static final String AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT = "fromSubject"; + static final String ANONYMOUS_USER = "anonymous"; + static final String ANONYMOUS_PASSWD = "anonymous"; + static final String USE_SSL = "ssl"; + static final String SSL_TRUST_STORE = "sslTrustStore"; + static final String SSL_TRUST_STORE_PASSWORD = "trustStorePassword"; + static final String TRANSPORT_MODE = "hive.server2.transport.mode"; + static final String HTTP_PATH = "hive.server2.thrift.http.path"; + static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode"; + // Don't use dynamic serice discovery + static final String SERVICE_DISCOVERY_MODE_NONE = "none"; + // Use ZooKeeper for indirection while using dynamic service discovery + static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper"; + static final String ZOOKEEPER_NAMESPACE = "zooKeeperNamespace"; + + // Non-configurable params: + // ZOOKEEPER_SESSION_TIMEOUT is not exposed as client configurable + static final int ZOOKEEPER_SESSION_TIMEOUT = 600 * 1000; + // Currently supports JKS keystore format + static final String SSL_TRUST_STORE_TYPE = "JKS"; + private String host = null; private int port; + private String jdbcUriString; private String dbName = DEFAULT_DATABASE; private Map hiveConfs = new LinkedHashMap(); private Map hiveVars = new LinkedHashMap(); private Map sessionVars = new LinkedHashMap(); private boolean isEmbeddedMode = false; + private String[] authorityList; + private String zooKeeperEnsemble = null; + private String currentHostZnodePath; + private List rejectedHostZnodePaths = new ArrayList(); public JdbcConnectionParams() { } @@ -62,46 +112,94 @@ public JdbcConnectionParams() { public String getHost() { return host; } + public int getPort() { return port; } + + public String getJdbcUriString() { + return jdbcUriString; + } + public String getDbName() { return dbName; } + public Map getHiveConfs() { return hiveConfs; } - public Map getHiveVars() { + + public Map getHiveVars() { return hiveVars; } + public boolean isEmbeddedMode() { return isEmbeddedMode; } + public Map getSessionVars() { return sessionVars; } + public String[] getAuthorityList() { + return authorityList; + } + + public String getZooKeeperEnsemble() { + return zooKeeperEnsemble; + } + + public List getRejectedHostZnodePaths() { + return rejectedHostZnodePaths; + } + + public String getCurrentHostZnodePath() { + return currentHostZnodePath; + } + public void setHost(String host) { this.host = host; } + public void setPort(int port) { this.port = port; } + + public void setJdbcUriString(String jdbcUriString) { + this.jdbcUriString = jdbcUriString; + } + public void setDbName(String dbName) { this.dbName = dbName; } + public void setHiveConfs(Map hiveConfs) { this.hiveConfs = hiveConfs; } - public void setHiveVars(Map hiveVars) { + + public void setHiveVars(Map hiveVars) { this.hiveVars = hiveVars; } + public void setEmbeddedMode(boolean embeddedMode) { this.isEmbeddedMode = embeddedMode; } + public void setSessionVars(Map sessionVars) { this.sessionVars = sessionVars; } + + public void setSuppliedAuthorityList(String[] authorityList) { + this.authorityList = authorityList; + } + + public void setZooKeeperEnsemble(String zooKeeperEnsemble) { + this.zooKeeperEnsemble = zooKeeperEnsemble; + } + + public void setCurrentHostZnodePath(String currentHostZnodePath) { + this.currentHostZnodePath = currentHostZnodePath; + } } // Verify success or success_with_info status, else throw SQLException @@ -124,27 +222,33 @@ public static void verifySuccess(TStatus status, boolean withInfo) throws SQLExc /** * Parse JDBC connection URL - * The new format of the URL is jdbc:hive2://:/dbName;sess_var_list?hive_conf_list#hive_var_list - * where the optional sess, conf and var lists are semicolon separated = pairs. As before, if the - * host/port is not specified, it the driver runs an embedded hive. + * The new format of the URL is: + * jdbc:hive2://:,:/dbName;sess_var_list?hive_conf_list#hive_var_list + * where the optional sess, conf and var lists are semicolon separated = pairs. + * For utilizing dynamic service discovery with HiveServer2 multiple comma separated host:port pairs can + * be specified as shown above. + * The JDBC driver resolves the list of uris and picks a specific server instance to connect to. + * Currently, dynamic service discovery using ZooKeeper is supported, in which case the host:port pairs represent a ZooKeeper ensemble. + * + * As before, if the host/port is not specified, it the driver runs an embedded hive. * examples - * jdbc:hive2://ubuntu:11000/db2?hive.cli.conf.printheader=true;hive.exec.mode.local.auto.inputbytes.max=9999#stab=salesTable;icol=customerID * jdbc:hive2://?hive.cli.conf.printheader=true;hive.exec.mode.local.auto.inputbytes.max=9999#stab=salesTable;icol=customerID * jdbc:hive2://ubuntu:11000/db2;user=foo;password=bar * * Connect to http://server:10001/hs2, with specified basicAuth credentials and initial database: - * jdbc:hive2://server:10001/db;user=foo;password=bar?hive.server2.transport.mode=http;hive.server2.thrift.http.path=hs2 - * - * Note that currently the session properties are not used. + * jdbc:hive2://server:10001/db;user=foo;password=bar?hive.server2.transport.mode=http;hive.server2.thrift.http.path=hs2 * * @param uri * @return + * @throws SQLException */ - public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentException { + public static JdbcConnectionParams parseURL(String uri) throws JdbcUriParseException, + SQLException, ZooKeeperHiveClientException { JdbcConnectionParams connParams = new JdbcConnectionParams(); if (!uri.startsWith(URL_PREFIX)) { - throw new IllegalArgumentException("Bad URL format: Missing prefix " + URL_PREFIX); + throw new JdbcUriParseException("Bad URL format: Missing prefix " + URL_PREFIX); } // For URLs with no other configuration @@ -154,29 +258,28 @@ public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentEx return connParams; } - URI jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length())); - - // Check to prevent unintentional use of embedded mode. A missing "/" - // to separate the 'path' portion of URI can result in this. - // The missing "/" common typo while using secure mode, eg of such url - - // jdbc:hive2://localhost:10000;principal=hive/HiveServer2Host@YOUR-REALM.COM - if((jdbcURI.getAuthority() != null) && (jdbcURI.getHost()==null)) { - throw new IllegalArgumentException("Bad URL format. Hostname not found " - + " in authority part of the url: " + jdbcURI.getAuthority() - + ". Are you missing a '/' after the hostname ?"); - } - - connParams.setHost(jdbcURI.getHost()); - if (connParams.getHost() == null) { + // The JDBC URI now supports specifying multiple host:port if dynamic service discovery is + // configured on HiveServer2 (like: host1:port1,host2:port2,host3:port3) + // We'll extract the authorities (host:port combo) from the URI, extract session vars, hive + // confs & hive vars by parsing it as a Java URI. + // To parse the intermediate URI as a Java URI, we'll give a dummy authority(dummy:00000). + // Later, we'll substitute the dummy authority for a resolved authority. + String dummyAuthorityString = "dummyhost:00000"; + String suppliedAuthorities = getAuthorities(uri, connParams); + if ((suppliedAuthorities == null) || (suppliedAuthorities.isEmpty())) { + // Given uri of the form: + // jdbc:hive2:///dbName;sess_var_list?hive_conf_list#hive_var_list connParams.setEmbeddedMode(true); } else { - int port = jdbcURI.getPort(); - if (port == -1) { - port = Integer.valueOf(DEFAULT_PORT); - } - connParams.setPort(port); + LOG.info("Supplied authorities: " + suppliedAuthorities); + String[] authorityList = suppliedAuthorities.split(","); + connParams.setSuppliedAuthorityList(authorityList); + uri = uri.replace(suppliedAuthorities, dummyAuthorityString); } + // Now parse the connection uri with dummy authority + URI jdbcURI = URI.create(uri.substring(URI_JDBC_PREFIX.length())); + // key=value pattern Pattern pattern = Pattern.compile("([^;]*)=([^;]*)[;]?"); @@ -192,12 +295,13 @@ public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentEx } else { // we have dbname followed by session parameters dbName = sessVars.substring(0, sessVars.indexOf(';')); - sessVars = sessVars.substring(sessVars.indexOf(';')+1); + sessVars = sessVars.substring(sessVars.indexOf(';') + 1); if (sessVars != null) { Matcher sessMatcher = pattern.matcher(sessVars); while (sessMatcher.find()) { if (connParams.getSessionVars().put(sessMatcher.group(1), sessMatcher.group(2)) != null) { - throw new IllegalArgumentException("Bad URL format: Multiple values for property " + sessMatcher.group(1)); + throw new JdbcUriParseException("Bad URL format: Multiple values for property " + + sessMatcher.group(1)); } } } @@ -225,10 +329,146 @@ public static JdbcConnectionParams parseURL(String uri) throws IllegalArgumentEx } } + // Extract host, port + if (connParams.isEmbeddedMode()) { + // In case of embedded mode we were supplied with an empty authority. + // So we never substituted the authority with a dummy one. + connParams.setHost(jdbcURI.getHost()); + connParams.setPort(jdbcURI.getPort()); + } else { + // Else substitute the dummy authority with a resolved one. + // In case of dynamic service discovery using ZooKeeper, it picks a server uri from ZooKeeper + String resolvedAuthorityString = resolveAuthority(connParams); + uri = uri.replace(dummyAuthorityString, resolvedAuthorityString); + connParams.setJdbcUriString(uri); + // Create a Java URI from the resolved URI for extracting the host/port + URI resolvedAuthorityURI = null; + try { + resolvedAuthorityURI = new URI(null, resolvedAuthorityString, null, null, null); + } catch (URISyntaxException e) { + throw new JdbcUriParseException("Bad URL format: ", e); + } + connParams.setHost(resolvedAuthorityURI.getHost()); + connParams.setPort(resolvedAuthorityURI.getPort()); + } + return connParams; } /** + * Get the authority string from the supplied uri, which could potentially contain multiple + * host:port pairs. + * + * @param uri + * @param connParams + * @return + * @throws JdbcUriParseException + */ + private static String getAuthorities(String uri, JdbcConnectionParams connParams) + throws JdbcUriParseException { + String authorities; + // For a jdbc uri like: jdbc:hive2://host1:port1,host2:port2,host3:port3/ + // Extract the uri host:port list starting after "jdbc:hive2://", till the 1st "/" or EOL + int fromIndex = Utils.URL_PREFIX.length(); + int toIndex = uri.indexOf("/", fromIndex); + if (toIndex < 0) { + authorities = uri.substring(fromIndex); + } else { + authorities = uri.substring(fromIndex, uri.indexOf("/", fromIndex)); + } + return authorities; + } + + /** + * Get a string representing a specific host:port + * @param connParams + * @return + * @throws JdbcUriParseException + * @throws ZooKeeperHiveClientException + */ + private static String resolveAuthority(JdbcConnectionParams connParams) + throws JdbcUriParseException, ZooKeeperHiveClientException { + String serviceDiscoveryMode = + connParams.getSessionVars().get(JdbcConnectionParams.SERVICE_DISCOVERY_MODE); + if ((serviceDiscoveryMode != null) + && (JdbcConnectionParams.SERVICE_DISCOVERY_MODE_ZOOKEEPER + .equalsIgnoreCase(serviceDiscoveryMode))) { + // Resolve using ZooKeeper + return resolveAuthorityUsingZooKeeper(connParams); + } else { + String authority = connParams.getAuthorityList()[0]; + URI jdbcURI = URI.create(URI_HIVE_PREFIX + "//" + authority); + // Check to prevent unintentional use of embedded mode. A missing "/" + // to separate the 'path' portion of URI can result in this. + // The missing "/" common typo while using secure mode, eg of such url - + // jdbc:hive2://localhost:10000;principal=hive/HiveServer2Host@YOUR-REALM.COM + if ((jdbcURI.getAuthority() != null) && (jdbcURI.getHost() == null)) { + throw new JdbcUriParseException("Bad URL format. Hostname not found " + + " in authority part of the url: " + jdbcURI.getAuthority() + + ". Are you missing a '/' after the hostname ?"); + } + // Return the 1st element of the array + return jdbcURI.getAuthority(); + } + } + + /** + * Read a specific host:port from ZooKeeper + * @param connParams + * @return + * @throws ZooKeeperHiveClientException + */ + private static String resolveAuthorityUsingZooKeeper(JdbcConnectionParams connParams) + throws ZooKeeperHiveClientException { + // Set ZooKeeper ensemble in connParams for later use + connParams.setZooKeeperEnsemble(joinStringArray(connParams.getAuthorityList(), ",")); + return ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams); + } + + /** + * Read the next server coordinates (host:port combo) from ZooKeeper. Ignore the znodes already + * explored. Also update the host, port, jdbcUriString fields of connParams. + * + * @param connParams + * @throws ZooKeeperHiveClientException + */ + static void updateConnParamsFromZooKeeper(JdbcConnectionParams connParams) + throws ZooKeeperHiveClientException { + // Add current host to the rejected list + connParams.getRejectedHostZnodePaths().add(connParams.getCurrentHostZnodePath()); + // Get another HiveServer2 uri from ZooKeeper + String serverUriString = ZooKeeperHiveClientHelper.getNextServerUriFromZooKeeper(connParams); + // Parse serverUri to a java URI and extract host, port + URI serverUri = null; + try { + // Note URL_PREFIX is not a valid scheme format, therefore leaving it null in the constructor + // to construct a valid URI + serverUri = new URI(null, serverUriString, null, null, null); + } catch (URISyntaxException e) { + throw new ZooKeeperHiveClientException(e); + } + String oldServerHost = connParams.getHost(); + int oldServerPort = connParams.getPort(); + String newServerHost = serverUri.getHost(); + int newServerPort = serverUri.getPort(); + connParams.setHost(newServerHost); + connParams.setPort(newServerPort); + connParams.setJdbcUriString(connParams.getJdbcUriString().replace( + oldServerHost + ":" + oldServerPort, newServerHost + ":" + newServerPort)); + } + + private static String joinStringArray(String[] stringArray, String seperator) { + StringBuilder stringBuilder = new StringBuilder(); + for (int cur = 0, end = stringArray.length; cur < end; cur++) { + if (cur > 0) { + stringBuilder.append(seperator); + } + stringBuilder.append(stringArray[cur]); + } + return stringBuilder.toString(); + } + + /** * Takes a version string delimited by '.' and '-' characters * and returns a partial version. * diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java new file mode 100644 index 0000000..186c676 --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientException.java @@ -0,0 +1,43 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.hive.jdbc; + +public class ZooKeeperHiveClientException extends Exception { + + private static final long serialVersionUID = 0; + + /** + * @param cause (original exception) + */ + public ZooKeeperHiveClientException(Throwable cause) { + super(cause); + } + + /** + * @param msg (exception message) + */ + public ZooKeeperHiveClientException(String msg) { + super(msg); + } + + /** + * @param msg (exception message) + * @param cause (original exception) + */ + public ZooKeeperHiveClientException(String msg, Throwable cause) { + super(msg, cause); + } + +} diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java new file mode 100644 index 0000000..06795a5 --- /dev/null +++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import java.nio.charset.Charset; +import java.sql.SQLException; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hive.jdbc.Utils.JdbcConnectionParams; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +public class ZooKeeperHiveClientHelper { + public static final Log LOG = LogFactory.getLog(ZooKeeperHiveClientHelper.class.getName()); + + /** + * A no-op watcher class + */ + public static class DummyWatcher implements Watcher { + public void process(org.apache.zookeeper.WatchedEvent event) { + } + } + + /** + * Resolve to a host:port by connecting to ZooKeeper and picking a host randomly. + * + * @param uri + * @param connParams + * @return + * @throws SQLException + */ + static String getNextServerUriFromZooKeeper(JdbcConnectionParams connParams) + throws ZooKeeperHiveClientException { + String zooKeeperEnsemble = connParams.getZooKeeperEnsemble(); + String zooKeeperNamespace = + connParams.getSessionVars().get(JdbcConnectionParams.ZOOKEEPER_NAMESPACE); + List serverHosts; + Random randomizer = new Random(); + String serverNode; + // Pick a random HiveServer2 host from the ZooKeeper namspace + try { + ZooKeeper zooKeeperClient = + new ZooKeeper(zooKeeperEnsemble, JdbcConnectionParams.ZOOKEEPER_SESSION_TIMEOUT, + new ZooKeeperHiveClientHelper.DummyWatcher()); + // All the HiveServer2 host nodes that are in ZooKeeper currently + serverHosts = zooKeeperClient.getChildren("/" + zooKeeperNamespace, false); + // Remove the znodes we've already tried from this list + serverHosts.removeAll(connParams.getRejectedHostZnodePaths()); + if (serverHosts.isEmpty()) { + throw new ZooKeeperHiveClientException( + "Tried all existing HiveServer2 uris from ZooKeeper."); + } + // Now pick a host randomly + serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size())); + connParams.setCurrentHostZnodePath(serverNode); + // Read the value from the node (UTF-8 enoded byte array) and convert it to a String + String serverUri = + new String(zooKeeperClient.getData("/" + zooKeeperNamespace + "/" + serverNode, false, + null), Charset.forName("UTF-8")); + LOG.info("Selected HiveServer2 instance with uri: " + serverUri); + return serverUri; + } catch (Exception e) { + throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper", e); + } + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 0919d2f..1334a91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.metadata.*; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.KeeperException; @@ -73,31 +74,6 @@ public ZooKeeperHiveLockManager() { } /** - * @param conf The hive configuration - * Get the quorum server address from the configuration. The format is: - * host1:port, host2:port.. - **/ - @VisibleForTesting - static String getQuorumServers(HiveConf conf) { - String[] hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM).split(","); - String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT); - StringBuilder quorum = new StringBuilder(); - for(int i=0; i locks = getLocks(conf, zkpClient, null, parent, false, false); @@ -629,7 +605,8 @@ public static void releaseAllLocks(HiveConf conf) throws Exception { if (fetchData) { try { - data = new HiveLockObjectData(new String(zkpClient.getData(curChild, new DummyWatcher(), null))); + data = new HiveLockObjectData(new String(zkpClient.getData(curChild, + new ZooKeeperHiveHelper.DummyWatcher(), null))); data.setClientIp(clientIp); } catch (Exception e) { LOG.error("Error in getting data for " + curChild, e); @@ -789,11 +766,6 @@ private static HiveLockMode getLockMode(HiveConf conf, String path) { return null; } - public static class DummyWatcher implements Watcher { - public void process(org.apache.zookeeper.WatchedEvent event) { - } - } - @Override public void prepareRetry() throws LockException { try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java new file mode 100644 index 0000000..d9faa45 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/ZooKeeperHiveHelper.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; + +public class ZooKeeperHiveHelper { + public static final Log LOG = LogFactory.getLog(ZooKeeperHiveHelper.class.getName()); + public static final String ZOOKEEPER_PATH_SEPARATOR = "/"; + /** + * Get the ensemble server addresses from the configuration. The format is: host1:port, + * host2:port.. + * + * @param conf + **/ + public static String getQuorumServers(HiveConf conf) { + String[] hosts = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM).split(","); + String port = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT); + StringBuilder quorum = new StringBuilder(); + for (int i = 0; i < hosts.length; i++) { + quorum.append(hosts[i].trim()); + if (!hosts[i].contains(":")) { + // if the hostname doesn't contain a port, add the configured port to hostname + quorum.append(":"); + quorum.append(port); + } + + if (i != hosts.length - 1) + quorum.append(","); + } + + return quorum.toString(); + } + + + /** + * Create a path on ZooKeeper, if it does not already exist ("mkdir -p") + * + * @param zooKeeperClient ZooKeeper session + * @param path string with ZOOKEEPER_PATH_SEPARATOR as the separator + * @param acl list of ACL entries + * @param createMode for create mode of each node in the patch + * @return + * @throws KeeperException + * @throws InterruptedException + */ + public static String createPathRecursively(ZooKeeper zooKeeperClient, String path, List acl, + CreateMode createMode) throws KeeperException, InterruptedException { + String[] pathComponents = StringUtils.splitByWholeSeparator(path, ZOOKEEPER_PATH_SEPARATOR); + String currentPath = ""; + for (String pathComponent : pathComponents) { + currentPath += ZOOKEEPER_PATH_SEPARATOR + pathComponent; + try { + String node = zooKeeperClient.create(currentPath, new byte[0], acl, createMode); + LOG.info("Created path: " + node); + } catch (KeeperException.NodeExistsException e) { + // Do nothing here + } + } + return currentPath; + } + + /** + * A no-op watcher class + */ + public static class DummyWatcher implements Watcher { + public void process(org.apache.zookeeper.WatchedEvent event) { + } + } + +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java index 59294b1..aacb73f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/zookeeper/TestZookeeperLockManager.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; +import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.junit.Assert; @@ -87,14 +88,14 @@ public void testDeleteWithChildren() throws Exception { public void testGetQuorumServers() { conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1"); conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999"); - Assert.assertEquals("node1:9999", ZooKeeperHiveLockManager.getQuorumServers(conf)); + Assert.assertEquals("node1:9999", ZooKeeperHiveHelper.getQuorumServers(conf)); conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1,node2,node3"); conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999"); - Assert.assertEquals("node1:9999,node2:9999,node3:9999", ZooKeeperHiveLockManager.getQuorumServers(conf)); + Assert.assertEquals("node1:9999,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf)); conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "node1:5666,node2,node3"); conf.setVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT, "9999"); - Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveLockManager.getQuorumServers(conf)); + Assert.assertEquals("node1:5666,node2:9999,node3:9999", ZooKeeperHiveHelper.getQuorumServers(conf)); } } diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index a0bc905..b46c5b4 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -66,7 +66,7 @@ private UserGroupInformation httpUGI; public CLIService() { - super("CLIService"); + super(CLIService.class.getSimpleName()); } @Override @@ -201,8 +201,7 @@ public SessionHandle openSessionWithImpersonation(String username, String passwo * @see org.apache.hive.service.cli.ICLIService#closeSession(org.apache.hive.service.cli.SessionHandle) */ @Override - public void closeSession(SessionHandle sessionHandle) - throws HiveSQLException { + public void closeSession(SessionHandle sessionHandle) throws HiveSQLException { sessionManager.closeSession(sessionHandle); LOG.debug(sessionHandle + ": closeSession()"); } @@ -470,4 +469,8 @@ public void renewDelegationToken(SessionHandle sessionHandle, HiveAuthFactory au sessionManager.getSession(sessionHandle).renewDelegationToken(authFactory, tokenStr); LOG.info(sessionHandle + ": renewDelegationToken()"); } + + public SessionManager getSessionManager() { + return sessionManager; + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index f5a8f27..a57b6e5 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -47,7 +47,7 @@ new HashMap(); public OperationManager() { - super("OperationManager"); + super(OperationManager.class.getSimpleName()); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index b0bb8be..5231d5e 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -166,8 +166,8 @@ private void processGlobalInitFile() { IHiveFileProcessor processor = new GlobalHivercFileProcessor(); try { - if (hiveConf.getVar(ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION) != null) { - String hiverc = hiveConf.getVar(ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION) + if (hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION) != null) { + String hiverc = hiveConf.getVar(ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION) + File.separator + SessionManager.HIVERCFILE; if (new File(hiverc).exists()) { LOG.info("Running global init file: " + hiverc); diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 11d25cc..4654acc 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -67,7 +67,7 @@ private volatile boolean shutdown; public SessionManager() { - super("SessionManager"); + super(SessionManager.class.getSimpleName()); } @Override @@ -356,5 +356,9 @@ private void executeSessionHooks(HiveSession session) throws Exception { return backgroundOperationPool.submit(r); } + public int getOpenSessionCount() { + return handleToSession.size(); + } + } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index 2b80adc..028d55e 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -18,7 +18,6 @@ package org.apache.hive.service.cli.thrift; -import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -40,72 +39,54 @@ public class ThriftBinaryCLIService extends ThriftCLIService { public ThriftBinaryCLIService(CLIService cliService) { - super(cliService, "ThriftBinaryCLIService"); + super(cliService, ThriftBinaryCLIService.class.getSimpleName()); } @Override public void run() { try { - hiveAuthFactory = new HiveAuthFactory(hiveConf); - TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory(); - TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this); - - String portString = System.getenv("HIVE_SERVER2_THRIFT_PORT"); - if (portString != null) { - portNum = Integer.valueOf(portString); - } else { - portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT); - } - - String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); - if (hiveHost == null) { - hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); - } - - if (hiveHost != null && !hiveHost.isEmpty()) { - serverAddress = new InetSocketAddress(hiveHost, portNum); - } else { - serverAddress = new InetSocketAddress(portNum); - } - - minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); - maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); - workerKeepAliveTime = hiveConf.getTimeVar( - ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS); + // Server thread pool String threadPoolName = "HiveServer2-Handler-Pool"; ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryWithGarbageCleanup(threadPoolName)); + // Thrift configs + hiveAuthFactory = new HiveAuthFactory(hiveConf); + TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory(); + TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this); TServerSocket serverSocket = null; if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum); } else { String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim(); if (keyStorePath.isEmpty()) { - throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname + - " Not configured for SSL connection"); + throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname + + " Not configured for SSL connection"); } String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname); - serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, - keyStorePath, keyStorePassword); + serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, keyStorePath, + keyStorePassword); } + + // Server args TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket) - .processorFactory(processorFactory) - .transportFactory(transportFactory) - .protocolFactory(new TBinaryProtocol.Factory()) - .executorService(executorService); + .processorFactory(processorFactory).transportFactory(transportFactory) + .protocolFactory(new TBinaryProtocol.Factory()).executorService(executorService); + // TCP Server server = new TThreadPoolServer(sargs); - - LOG.info("ThriftBinaryCLIService listening on " + serverAddress); - server.serve(); - + String msg = "Started " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; + LOG.info(msg); } catch (Throwable t) { - LOG.error("Error: ", t); + LOG.fatal( + "Error starting HiveServer2: could not start " + + ThriftBinaryCLIService.class.getSimpleName(), t); + System.exit(-1); } - } + } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 443c371..c4b273c 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import javax.security.auth.login.LoginException; @@ -34,6 +35,7 @@ import org.apache.hive.service.auth.TSetIpAddressProcessor; import org.apache.hive.service.cli.*; import org.apache.hive.service.cli.session.SessionManager; +import org.apache.hive.service.server.HiveServer2; import org.apache.thrift.TException; import org.apache.thrift.server.TServer; @@ -48,9 +50,11 @@ protected CLIService cliService; private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS); private static final TStatus ERROR_STATUS = new TStatus(TStatusCode.ERROR_STATUS); + protected static HiveAuthFactory hiveAuthFactory; protected int portNum; protected InetSocketAddress serverAddress; + protected String hiveHost; protected TServer server; protected org.eclipse.jetty.server.Server httpServer; @@ -62,8 +66,7 @@ protected int minWorkerThreads; protected int maxWorkerThreads; protected long workerKeepAliveTime; - - protected static HiveAuthFactory hiveAuthFactory; + private HiveServer2 hiveServer2; public ThriftCLIService(CLIService cliService, String serviceName) { super(serviceName); @@ -73,6 +76,43 @@ public ThriftCLIService(CLIService cliService, String serviceName) { @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; + + // Initialize common server configs needed in both binary & http modes + String portString; + hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); + if (hiveHost == null) { + hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); + } + // HTTP mode + if (HiveServer2.isHTTPTransportMode(hiveConf)) { + workerKeepAliveTime = + hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, + TimeUnit.SECONDS); + portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT"); + if (portString != null) { + portNum = Integer.valueOf(portString); + } else { + portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT); + } + } + // Binary mode + else { + workerKeepAliveTime = + hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS); + portString = System.getenv("HIVE_SERVER2_THRIFT_PORT"); + if (portString != null) { + portNum = Integer.valueOf(portString); + } else { + portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT); + } + } + if (hiveHost != null && !hiveHost.isEmpty()) { + serverAddress = new InetSocketAddress(hiveHost, portNum); + } else { + serverAddress = new InetSocketAddress(portNum); + } + minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); + maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); super.init(hiveConf); } @@ -105,6 +145,14 @@ public synchronized void stop() { super.stop(); } + public int getPortNumber() { + return portNum; + } + + public InetSocketAddress getServerAddress() { + return serverAddress; + } + @Override public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq req) throws TException { @@ -308,6 +356,24 @@ public TCloseSessionResp CloseSession(TCloseSessionReq req) throws TException { } catch (Exception e) { LOG.warn("Error closing session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); + } finally { + if (!(isEmbedded) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) + && (!hiveServer2.isRegisteredWithZooKeeper())) { + // Asynchronously shutdown this instance of HiveServer2, + // if there are no active client sessions + if (cliService.getSessionManager().getOpenSessionCount() == 0) { + LOG.info("This instance of HiveServer2 has been removed from the list of server " + + "instances available for dynamic service discovery. " + + "The last client session has ended - will shutdown now."); + Thread shutdownThread = new Thread() { + @Override + public void run() { + hiveServer2.stop(); + } + }; + shutdownThread.start(); + } + } } return resp; } @@ -591,5 +657,9 @@ private boolean isKerberosAuthMode() { .equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString()); } + public void setHiveServer2(HiveServer2 hiveServer2) { + this.hiveServer2 = hiveServer2; + } + } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 4067106..795115e 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -48,100 +48,94 @@ public class ThriftHttpCLIService extends ThriftCLIService { public ThriftHttpCLIService(CLIService cliService) { - super(cliService, "ThriftHttpCLIService"); + super(cliService, ThriftHttpCLIService.class.getSimpleName()); } + /** + * Configure Jetty to serve http requests. Example of a client connection URL: + * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target URL to differ, + * e.g. http://gateway:port/hive2/servlets/thrifths2/ + */ @Override public void run() { try { - // Configure Jetty to serve http requests - // Example of a client connection URL: http://localhost:10000/servlets/thrifths2/ - // a gateway may cause actual target URL to differ, e.g. http://gateway:port/hive2/servlets/thrifths2/ - + // Verify config validity verifyHttpConfiguration(hiveConf); - String portString = System.getenv("HIVE_SERVER2_THRIFT_HTTP_PORT"); - if (portString != null) { - portNum = Integer.valueOf(portString); - } else { - portNum = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT); - } - - minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS); - maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS); - workerKeepAliveTime = hiveConf.getTimeVar( - ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS); - - String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); - + // HTTP Server httpServer = new org.eclipse.jetty.server.Server(); + + // Server thread pool String threadPoolName = "HiveServer2-HttpHandler-Pool"; ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactoryWithGarbageCleanup(threadPoolName)); - ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService); httpServer.setThreadPool(threadPool); - SelectChannelConnector connector = new SelectChannelConnector();; + // Connector configs + SelectChannelConnector connector = new SelectChannelConnector(); boolean useSsl = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL); String schemeName = useSsl ? "https" : "http"; - String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION); - // Set during the init phase of HiveServer2 if auth mode is kerberos - // UGI for the hive/_HOST (kerberos) principal - UserGroupInformation serviceUGI = cliService.getServiceUGI(); - // UGI for the http/_HOST (SPNego) principal - UserGroupInformation httpUGI = cliService.getHttpUGI(); - + // Change connector if SSL is used if (useSsl) { String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim(); String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname); if (keyStorePath.isEmpty()) { - throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname + - " Not configured for SSL connection"); + throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname + + " Not configured for SSL connection"); } SslContextFactory sslContextFactory = new SslContextFactory(); sslContextFactory.setKeyStorePath(keyStorePath); sslContextFactory.setKeyStorePassword(keyStorePassword); connector = new SslSelectChannelConnector(sslContextFactory); } - connector.setPort(portNum); // Linux:yes, Windows:no connector.setReuseAddress(!Shell.WINDOWS); - - int maxIdleTime = (int) hiveConf.getTimeVar( - ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS); + int maxIdleTime = (int) hiveConf.getTimeVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, + TimeUnit.MILLISECONDS); connector.setMaxIdleTime(maxIdleTime); - + httpServer.addConnector(connector); + // Thrift configs hiveAuthFactory = new HiveAuthFactory(hiveConf); TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this); TProcessor processor = processorFactory.getProcessor(null); - TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); + // Set during the init phase of HiveServer2 if auth mode is kerberos + // UGI for the hive/_HOST (kerberos) principal + UserGroupInformation serviceUGI = cliService.getServiceUGI(); + // UGI for the http/_HOST (SPNego) principal + UserGroupInformation httpUGI = cliService.getHttpUGI(); + String authType = hiveConf.getVar(ConfVars.HIVE_SERVER2_AUTHENTICATION); + TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, authType, + serviceUGI, httpUGI); - TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, - authType, serviceUGI, httpUGI); - + // Context handler final ServletContextHandler context = new ServletContextHandler( ServletContextHandler.SESSIONS); context.setContextPath("/"); - + String httpPath = getHttpPath(hiveConf + .getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH)); httpServer.setHandler(context); context.addServlet(new ServletHolder(thriftHttpServlet), httpPath); // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc. + // Finally, start the server httpServer.start(); - String msg = "Started ThriftHttpCLIService in " + schemeName + " mode on port " + portNum + - " path=" + httpPath + - " with " + minWorkerThreads + ".." + maxWorkerThreads + " worker threads"; + String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName + + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..." + + maxWorkerThreads + " worker threads"; LOG.info(msg); httpServer.join(); } catch (Throwable t) { - LOG.error("Error: ", t); + LOG.fatal( + "Error starting HiveServer2: could not start " + + ThriftHttpCLIService.class.getSimpleName(), t); + System.exit(-1); } } @@ -191,7 +185,8 @@ private static void verifyHttpConfiguration(HiveConf hiveConf) { // NONE in case of thrift mode uses SASL LOG.warn(ConfVars.HIVE_SERVER2_AUTHENTICATION + " setting to " + authType + ". SASL is not supported with http transport mode," + - " so using equivalent of " + AuthTypes.NOSASL); + " so using equivalent of " + + AuthTypes.NOSASL); } } diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 124996c..c667533 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -18,6 +18,8 @@ package org.apache.hive.service.server; +import java.nio.charset.Charset; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.LogUtils; @@ -25,12 +27,21 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.CompositeService; +import org.apache.hive.service.ServiceException; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; /** * HiveServer2. @@ -41,9 +52,12 @@ private CLIService cliService; private ThriftCLIService thriftCLIService; + private String znodePath; + private ZooKeeper zooKeeperClient; + private boolean registeredWithZooKeeper = false; public HiveServer2() { - super("HiveServer2"); + super(HiveServer2.class.getSimpleName()); HiveConf.setLoadHiveServer2Config(true); } @@ -52,20 +66,129 @@ public HiveServer2() { public synchronized void init(HiveConf hiveConf) { cliService = new CLIService(); addService(cliService); + if (isHTTPTransportMode(hiveConf)) { + thriftCLIService = new ThriftHttpCLIService(cliService); + } else { + thriftCLIService = new ThriftBinaryCLIService(cliService); + } + addService(thriftCLIService); + thriftCLIService.setHiveServer2(this); + super.init(hiveConf); + // Add a shutdown hook for catching SIGTERM & SIGINT + final HiveServer2 hiveServer2 = this; + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + hiveServer2.stop(); + } + }); + } + + public static boolean isHTTPTransportMode(HiveConf hiveConf) { String transportMode = System.getenv("HIVE_SERVER2_TRANSPORT_MODE"); - if(transportMode == null) { + if (transportMode == null) { transportMode = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_TRANSPORT_MODE); } - if(transportMode != null && (transportMode.equalsIgnoreCase("http"))) { - thriftCLIService = new ThriftHttpCLIService(cliService); + if (transportMode != null && (transportMode.equalsIgnoreCase("http"))) { + return true; } - else { - thriftCLIService = new ThriftBinaryCLIService(cliService); + return false; + } + + /** + * Adds a server instance to ZooKeeper as a znode. + * + * @param hiveConf + * @throws Exception + */ + private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception { + int zooKeeperSessionTimeout = + hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT); + String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); + String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); + String instanceURI = getServerInstanceURI(hiveConf); + byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8")); + zooKeeperClient = + new ZooKeeper(zooKeeperEnsemble, zooKeeperSessionTimeout, + new ZooKeeperHiveHelper.DummyWatcher()); + + // Create the parent znodes recursively; ignore if the parent already exists + try { + ZooKeeperHiveHelper.createPathRecursively(zooKeeperClient, rootNamespace, + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2"); + } catch (KeeperException e) { + if (e.code() != KeeperException.Code.NODEEXISTS) { + LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e); + throw (e); + } } + // Create a znode under the rootNamespace parent for this instance of the server + // Znode name: server-host:port-versionInfo-sequence + try { + String znodePath = + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace + + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "server-" + instanceURI + "-" + + HiveVersionInfo.getVersion() + "-"; + znodePath = + zooKeeperClient.create(znodePath, znodeDataUTF8, Ids.OPEN_ACL_UNSAFE, + CreateMode.EPHEMERAL_SEQUENTIAL); + setRegisteredWithZooKeeper(true); + // Set a watch on the znode + if (zooKeeperClient.exists(znodePath, new DeRegisterWatcher()) == null) { + // No node exists, throw exception + throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper."); + } + LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI); + } catch (KeeperException e) { + LOG.fatal("Unable to create a znode for this server instance", e); + throw new Exception(e); + } + } - addService(thriftCLIService); - super.init(hiveConf); + /** + * The watcher class which sets the de-register flag when the znode corresponding to this server + * instance is deleted. Additionally, it shuts down the server if there are no more active client + * sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper. + */ + private class DeRegisterWatcher implements Watcher { + public void process(WatchedEvent event) { + if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) { + HiveServer2.this.setRegisteredWithZooKeeper(false); + // If there are no more active client sessions, stop the server + if (cliService.getSessionManager().getOpenSessionCount() == 0) { + LOG.warn("This instance of HiveServer2 has been removed from the list of server " + + "instances available for dynamic service discovery. " + + "The last client session has ended - will shutdown now."); + HiveServer2.this.stop(); + } + LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. " + + "The server will be shut down after the last client sesssion completes."); + } + } + } + + private void removeServerInstanceFromZooKeeper() throws Exception { + setRegisteredWithZooKeeper(false); + zooKeeperClient.close(); + LOG.info("Server instance removed from ZooKeeper."); + } + + public boolean isRegisteredWithZooKeeper() { + return registeredWithZooKeeper; + } + + private void setRegisteredWithZooKeeper(boolean registeredWithZooKeeper) { + this.registeredWithZooKeeper = registeredWithZooKeeper; + } + + private String getServerInstanceURI(HiveConf hiveConf) throws Exception { + if ((thriftCLIService == null) || (thriftCLIService.getServerAddress() == null)) { + throw new Exception("Unable to get the server address; it hasn't been initialized yet."); + } + return thriftCLIService.getServerAddress().getHostName() + ":" + + thriftCLIService.getPortNumber(); } @Override @@ -75,23 +198,32 @@ public synchronized void start() { @Override public synchronized void stop() { - super.stop(); - // there should already be an instance of the session pool manager. - // if not, ignoring is fine while stopping the hive server. + LOG.info("Shutting down HiveServer2"); HiveConf hiveConf = this.getHiveConf(); + super.stop(); + // Remove this server instance from ZooKeeper if dynamic service discovery is set + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + try { + removeServerInstanceFromZooKeeper(); + } catch (Exception e) { + LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e); + } + } + // There should already be an instance of the session pool manager. + // If not, ignoring is fine while stopping HiveServer2. if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { try { TezSessionPoolManager.getInstance().stop(); } catch (Exception e) { - LOG.error("Tez session pool manager stop had an error during stop of hive server"); - e.printStackTrace(); + LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " + + "Shutting down HiveServer2 anyway.", e); } } } private static void startHiveServer2() throws Throwable { long attempts = 0, maxAttempts = 1; - while(true) { + while (true) { HiveConf hiveConf = new HiveConf(); maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS); HiveServer2 server = null; @@ -99,6 +231,11 @@ private static void startHiveServer2() throws Throwable { server = new HiveServer2(); server.init(hiveConf); server.start(); + // If we're supporting dynamic service discovery, we'll add the service uri for this + // HiveServer2 instance to Zookeeper as a znode. + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { + server.addServerInstanceToZooKeeper(hiveConf); + } if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance(); sessionPool.setupPool(hiveConf); @@ -106,19 +243,19 @@ private static void startHiveServer2() throws Throwable { } break; } catch (Throwable throwable) { - if(++attempts >= maxAttempts) { + if (++attempts >= maxAttempts) { throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable); } else { - LOG.warn("Error starting HiveServer2 on attempt " + attempts + - ", will retry in 60 seconds", throwable); + LOG.warn("Error starting HiveServer2 on attempt " + attempts + + ", will retry in 60 seconds", throwable); try { if (server != null) { server.stop(); server = null; } } catch (Exception e) { - LOG.info("Exception caught when calling stop of HiveServer2 before" + - " retrying start", e); + LOG.info( + "Exception caught when calling stop of HiveServer2 before" + " retrying start", e); } try { Thread.sleep(60L * 1000L); @@ -139,14 +276,15 @@ public static void main(String[] args) { System.exit(-1); } - //NOTE: It is critical to do this here so that log4j is reinitialized + // NOTE: It is critical to do this here so that log4j is reinitialized // before any of the other core hive classes are loaded String initLog4jMessage = LogUtils.initHiveLog4j(); LOG.debug(initLog4jMessage); HiveStringUtils.startupShutdownMessage(HiveServer2.class, args, LOG); - //log debug message from "oproc" after log4j initialize properly + // log debug message from "oproc" after log4j initialize properly LOG.debug(oproc.getDebugMessage().toString()); + startHiveServer2(); } catch (LogInitializationException e) { LOG.error("Error initializing log: " + e.getMessage(), e); @@ -156,6 +294,5 @@ public static void main(String[] args) { System.exit(-1); } } - } diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java index 66fc1fc..5b1cbc0 100644 --- a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java +++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java @@ -27,7 +27,11 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hive.service.cli.*; +import org.apache.hive.service.cli.CLIService; +import org.apache.hive.service.cli.ICLIService; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.RowSet; +import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient; import org.junit.After; @@ -83,7 +87,7 @@ public void setUp() throws Exception { // set up service and client HiveConf hiveConf = new HiveConf(); - hiveConf.setVar(HiveConf.ConfVars.HIVE_GLOBAL_INIT_FILE_LOCATION, + hiveConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_GLOBAL_INIT_FILE_LOCATION, initFile.getParentFile().getAbsolutePath()); service = new FakeEmbeddedThriftBinaryCLIService(hiveConf); service.init(new HiveConf());