diff --git jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 8595677..02f859a 100644 --- jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -20,7 +20,12 @@ import java.io.FileInputStream; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.security.KeyStore; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.sql.Array; import java.sql.Blob; import java.sql.CallableStatement; @@ -52,6 +57,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.service.auth.HiveAuthFactory; @@ -105,6 +111,9 @@ private static final String HIVE_SERVER2_THRIFT_HTTP_PATH = "hive.server2.thrift.http.path"; private static final String HIVE_VAR_PREFIX = "hivevar:"; private static final String HIVE_CONF_PREFIX = "hiveconf:"; + + private static final String HIVE_JDBC_AUTO_RECONNECT = "autoReconnect"; + // Currently supports JKS keystore format // See HIVE-6286 (Add support for PKCS12 keystore format) private static final String HIVE_SSL_TRUST_STORE_TYPE = "JKS"; @@ -177,7 +186,7 @@ public HiveConnection(String uri, Properties info) throws SQLException { // open the client transport openTransport(); // set up the client - client = new TCLIService.Client(new TBinaryProtocol(transport)); + client = wrapClient(new HiveProtocol(transport)); } // add supported protocols @@ -195,7 +204,7 @@ public HiveConnection(String uri, Properties info) throws SQLException { configureConnection(connParams.getDbName()); } - private void openTransport() throws SQLException { + private TTransport openTransport() throws SQLException { // TODO: Refactor transport creation to a factory, it's getting uber messy here transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport(); try { @@ -206,6 +215,7 @@ private void openTransport() throws SQLException { throw new SQLException("Could not open connection to " + jdbcURI + ": " + e.getMessage(), " 08S01", e); } + return transport; } private String getServerHttpUrl(boolean useSsl) { @@ -400,6 +410,60 @@ private String getClientDelegationToken(Map jdbcConnConf) return tokenStr; } + private static class HiveProtocol extends TBinaryProtocol { + public HiveProtocol(TTransport trans) { + super(trans); + } + public void setTransport(TTransport transport) { + this.trans_ = transport; + } + } + + private TCLIService.Iface wrapClient(final HiveProtocol protocol) { + final TCLIService.Client client = new TCLIService.Client(protocol); + String reconnect = getSessionValue(HIVE_JDBC_AUTO_RECONNECT, null); + if (!"true".equalsIgnoreCase(reconnect)) { + return client; + } + + InvocationHandler handler = new InvocationHandler() { + private boolean failed; + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (failed) { + protocol.setTransport(openTransport()); + failed = false; + } + try { + return method.invoke(client, args); + } catch (InvocationTargetException e) { + if (e.getTargetException() instanceof TTransportException) { + failed = true; + closeAsync(protocol.getTransport()); // takes times + } + throw e.getTargetException(); + } catch (UndeclaredThrowableException e) { + throw e.getCause(); + } + } + }; + return (TCLIService.Iface) Proxy.newProxyInstance( + JavaUtils.getClassLoader(), new Class[]{TCLIService.Iface.class}, handler); + } + + private static void closeAsync(final TTransport transport) { + Thread closer = new Thread() { + public void run() { + try { + transport.close(); + } catch (Exception e) { + // ignore + } + } + }; + closer.setDaemon(true); + closer.start(); + } + private void openSession(Map sessVars) throws SQLException { TOpenSessionReq openReq = new TOpenSessionReq();