diff --git jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index cbcfec7..5e70f95 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -18,9 +18,15 @@ package org.apache.hive.jdbc; +import java.io.Closeable; import java.io.FileInputStream; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.security.KeyStore; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; @@ -52,7 +58,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.IOUtils; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.KerberosSaslHelper; import org.apache.hive.service.auth.PlainSaslHelper; @@ -104,6 +112,9 @@ private static final String HIVE_SERVER2_THRIFT_HTTP_PATH = "hive.server2.thrift.http.path"; private static final String HIVE_VAR_PREFIX = "hivevar:"; private static final String HIVE_CONF_PREFIX = "hiveconf:"; + + private static final String HIVE_JDBC_AUTO_RECONNECT = "autoReconnect"; + // Currently supports JKS keystore format // See HIVE-6286 (Add support for PKCS12 keystore format) private static final String HIVE_SSL_TRUST_STORE_TYPE = "JKS"; @@ -115,7 +126,6 @@ private final Map hiveConfMap; private final Map hiveVarMap; private final boolean isEmbeddedMode; - private TTransport transport; private TCLIService.Iface client; // todo should be replaced by CliServiceClient private boolean isClosed = true; private SQLWarning warningChain = null; @@ -173,10 +183,8 @@ public HiveConnection(String uri, Properties info) throws SQLException { if (info.containsKey(HIVE_AUTH_TYPE)) { sessConfMap.put(HIVE_AUTH_TYPE, info.getProperty(HIVE_AUTH_TYPE)); } - // open the client transport - openTransport(); // set up the client - client = new TCLIService.Client(new TBinaryProtocol(transport)); + client = setupClient(jdbcURI); } // add supported protocols @@ -192,9 +200,9 @@ public HiveConnection(String uri, Properties info) throws SQLException { openSession(connParams); } - private void openTransport() throws SQLException { + private TTransport openTransport(String jdbcURI) throws SQLException { // TODO: Refactor transport creation to a factory, it's getting uber messy here - transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); + TTransport transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); try { if (!transport.isOpen()) { transport.open(); @@ -203,6 +211,7 @@ private void openTransport() throws SQLException { throw new SQLException("Could not open connection to " + jdbcURI + ": " + e.getMessage(), " 08S01", e); } + return transport; } private String getServerHttpUrl(boolean useSsl) { @@ -236,14 +245,13 @@ private TTransport createHttpTransport() throws SQLException { } try { - transport = new THttpClient(getServerHttpUrl(useSsl), httpClient); + return new THttpClient(getServerHttpUrl(useSsl), httpClient); } catch (TTransportException e) { String msg = "Could not create http connection to " + jdbcURI + ". " + e.getMessage(); throw new SQLException(msg, " 08S01", e); } - return transport; } private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException { @@ -318,13 +326,16 @@ private DefaultHttpClient getHttpClient(Boolean useSsl) throws SQLException { * Kerberos and Delegation token supports SASL QOP configurations */ private TTransport createBinaryTransport() throws SQLException { + boolean success = false; + TTransport transport = null; try { // handle secure connection if specified if (!HIVE_AUTH_SIMPLE.equals(sessConfMap.get(HIVE_AUTH_TYPE))) { // If Kerberos Map saslProps = new HashMap(); SaslQOP saslQOP = SaslQOP.AUTH; - if (sessConfMap.containsKey(HIVE_AUTH_PRINCIPAL)) { + String principal = sessConfMap.get(HIVE_AUTH_PRINCIPAL); + if (principal != null && !principal.isEmpty()) { if (sessConfMap.containsKey(HIVE_AUTH_QOP)) { try { saslQOP = SaslQOP.fromString(sessConfMap.get(HIVE_AUTH_QOP)); @@ -335,16 +346,17 @@ private TTransport createBinaryTransport() throws SQLException { } saslProps.put(Sasl.QOP, saslQOP.toString()); saslProps.put(Sasl.SERVER_AUTH, "true"); - boolean assumeSubject = HIVE_AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap.get(HIVE_AUTH_KERBEROS_AUTH_TYPE)); + boolean assumeSubject = HIVE_AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals( + sessConfMap.get(HIVE_AUTH_KERBEROS_AUTH_TYPE)); + transport = HiveAuthFactory.getSocketTransport(host, port, loginTimeout); transport = KerberosSaslHelper.getKerberosTransport( - sessConfMap.get(HIVE_AUTH_PRINCIPAL), host, - HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps, assumeSubject); + principal, host, transport, saslProps, assumeSubject); } else { // If there's a delegation token available then use token based connection String tokenStr = getClientDelegationToken(sessConfMap); if (tokenStr != null) { - transport = KerberosSaslHelper.getTokenTransport(tokenStr, - host, HiveAuthFactory.getSocketTransport(host, port, loginTimeout), saslProps); + transport = HiveAuthFactory.getSocketTransport(host, port, loginTimeout); + transport = KerberosSaslHelper.getTokenTransport(tokenStr, host, transport, saslProps); } else { // we are using PLAIN Sasl connection with user/password String userName = getUserName(); @@ -371,12 +383,17 @@ private TTransport createBinaryTransport() throws SQLException { // Raw socket connection (non-sasl) transport = HiveAuthFactory.getSocketTransport(host, port, loginTimeout); } + success = true; } catch (SaslException e) { throw new SQLException("Could not create secure connection to " + jdbcURI + ": " + e.getMessage(), " 08S01", e); } catch (TTransportException e) { throw new SQLException("Could not create connection to " + jdbcURI + ": " + e.getMessage(), " 08S01", e); + } finally { + if (!success && transport != null && transport.isOpen()) { + transport.close(); + } } return transport; } @@ -397,6 +414,76 @@ private String getClientDelegationToken(Map jdbcConnConf) return tokenStr; } + // replace transport if needed + private static class HiveProtocol extends TBinaryProtocol { + public HiveProtocol(TTransport trans) { + super(trans); + } + public void setTransport(TTransport transport) { + this.trans_ = transport; + this.readLength_ = 0; + this.checkReadLength_ = false; + } + } + + private TCLIService.Iface setupClient(final String jdbcURI) throws SQLException { + + final HiveProtocol protocol = new HiveProtocol(null); + final TCLIService.Client client = new TCLIService.Client(protocol); + final boolean reconnect = + "true".equalsIgnoreCase(getSessionValue(HIVE_JDBC_AUTO_RECONNECT, null)); + + InvocationHandler handler = new InvocationHandler() { + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (method.getDeclaringClass() == Closeable.class && protocol.getTransport() != null) { + protocol.getTransport().close(); + return null; + } + if (protocol.getTransport() == null) { + if (!reconnect) { + throw new TTransportException(TTransportException.NOT_OPEN); + } + protocol.setTransport(openTransport(jdbcURI)); + } + try { + return method.invoke(client, args); + } catch (InvocationTargetException e) { + if (e.getTargetException() instanceof TTransportException) { + closeAsync(protocol.getTransport(), 3000); // takes times + protocol.setTransport(null); + } + throw e.getTargetException(); + } catch (UndeclaredThrowableException e) { + throw e.getUndeclaredThrowable(); + } + } + }; + if (!reconnect) { + protocol.setTransport(openTransport(jdbcURI)); + } + return (TCLIService.Iface) Proxy.newProxyInstance( + JavaUtils.getClassLoader(), new Class[]{TCLIService.Iface.class, Closeable.class}, handler); + } + + private static void closeAsync(final TTransport transport, long waitMsec) { + Thread closer = new Thread() { + public void run() { + try { + transport.close(); + } catch (Exception e) { + // ignore + } + } + }; + closer.setDaemon(true); + closer.start(); + try { + closer.join(waitMsec); + } catch (InterruptedException e) { + // ignore + } + } + private void openSession(Utils.JdbcConnectionParams connParams) throws SQLException { TOpenSessionReq openReq = new TOpenSessionReq(); @@ -564,8 +651,8 @@ public void close() throws SQLException { throw new SQLException("Error while cleaning up the server resources", e); } finally { isClosed = true; - if (transport != null) { - transport.close(); + if (client instanceof Closeable) { + IOUtils.closeStream((Closeable) client); } } }