From d6993ef565fddd2a43495ed31db3865213b28c08 Mon Sep 17 00:00:00 2001 From: Azrael Park Date: Sat, 10 Aug 2013 14:39:04 +0900 Subject: [PATCH] HIVE-5039: Support autoReconnect at JDBC --- .../java/org/apache/hive/jdbc/HiveConnection.java | 261 ++++++++++++++++++--- .../apache/hive/jdbc/HivePreparedStatement.java | 5 +- .../java/org/apache/hive/jdbc/HiveStatement.java | 58 ++++- .../hive/jdbc/TestJdbcDriver2Connection.java | 211 +++++++++++++++++ 4 files changed, 494 insertions(+), 41 deletions(-) create mode 100644 jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2Connection.java diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 9fbc8ad..553f98e 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -51,6 +51,8 @@ import org.apache.hive.service.cli.thrift.EmbeddedThriftCLIService; import org.apache.hive.service.cli.thrift.TCLIService; import org.apache.hive.service.cli.thrift.TCloseSessionReq; +import org.apache.hive.service.cli.thrift.TExecuteStatementReq; +import org.apache.hive.service.cli.thrift.TExecuteStatementResp; import org.apache.hive.service.cli.thrift.TOpenSessionReq; import org.apache.hive.service.cli.thrift.TOpenSessionResp; import org.apache.hive.service.cli.thrift.TProtocolVersion; @@ -76,38 +78,99 @@ private static final String HIVE_ANONYMOUS_USER = "anonymous"; private static final String HIVE_ANONYMOUS_PASSWD = "anonymous"; - private TTransport transport; - private TCLIService.Iface client; + private static final int RETRY_COUNT = 3; + + private CoreConnection coreConnection; private boolean isClosed = true; private SQLWarning warningChain = null; - private TSessionHandle sessHandle = null; private final List supportedProtocols = new LinkedList(); + + private Utils.JdbcConnectionParams connParams = null; + private boolean autoReconnect = false; + private boolean needPing = false; /** * TODO: - parse uri (use java.net.URI?). */ public HiveConnection(String uri, Properties info) throws SQLException { - Utils.JdbcConnectionParams connParams = Utils.parseURL(uri); + connParams = Utils.parseURL(uri); + if(connParams.getHiveConfs().containsKey("autoReconnect")){ + if(connParams.getHiveConfs().get("autoReconnect").toString().equals("true")){ + setAutoReconnect(true); + } + } + + // currently only V1 is supported + supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1); + + coreConnection = connect(connParams, info); + + configureConnection(connParams); + } + + private CoreConnection connect(Utils.JdbcConnectionParams connParams, Properties info) throws SQLException{ + CoreConnection connection = new CoreConnection(); if (connParams.isEmbeddedMode()) { - client = new EmbeddedThriftCLIService(); + TCLIService.Iface client = new EmbeddedThriftCLIService(); + connection.setClient(client); } else { // extract user/password from JDBC connection properties if its not supplied in the connection URL if (info.containsKey(HIVE_AUTH_USER)) { connParams.getSessionVars().put(HIVE_AUTH_USER, info.getProperty(HIVE_AUTH_USER)); if (info.containsKey(HIVE_AUTH_PASSWD)) { - connParams.getSessionVars().put(HIVE_AUTH_PASSWD, info.getProperty(HIVE_AUTH_PASSWD)); + connParams.getSessionVars().put(HIVE_AUTH_PASSWD, info.getProperty(HIVE_AUTH_PASSWD)); } } - openTransport(uri, connParams.getHost(), connParams.getPort(), connParams.getSessionVars()); + TTransport transport = new TSocket(connParams.getHost(), connParams.getPort()); + TCLIService.Iface client = openTransport(transport, connParams); + connection.setClient(client); + connection.setTransport(transport); } - // currently only V1 is supported - supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1); // open client session - openSession(uri); + TSessionHandle sessionHandle = openSession(connection.getClient(), connParams); + connection.setSessionHandle(sessionHandle); - configureConnection(connParams); + isClosed = false; + + return connection; + } + + public HiveConnection reconnect() throws SQLException{ + CoreConnection oldConnection = coreConnection; + CoreConnection newConnection = null; + boolean connected = false; + SQLException cause = null; + + for(int i = 0; i < RETRY_COUNT && !connected; i++){ + try { + newConnection = connect(connParams, new Properties()); + connected = true; + break; + } catch (SQLException e){ + cause = e; + } + if(!connected){ + try{ + Thread.sleep(1000); + } catch(InterruptedException e){ + + } + } + } + if(!connected){ + throw new SQLException("reconnect fail : "+cause.toString(), "08S01", cause); + } + + try{ + oldConnection.close(); + }catch(Exception e){ + //ignore error + } + coreConnection = newConnection; + + return this; } private void configureConnection(Utils.JdbcConnectionParams connParams) @@ -127,11 +190,14 @@ private void configureConnection(Utils.JdbcConnectionParams connParams) } } - private void openTransport(String uri, String host, int port, Map sessConf ) + private TCLIService.Iface openTransport(TTransport transport, Utils.JdbcConnectionParams connParams ) throws SQLException { - transport = new TSocket(host, port); + + String host = connParams.getHost(); + int port = connParams.getPort(); // handle secure connection if specified + Map sessConf = connParams.getSessionVars(); if (!sessConf.containsKey(HIVE_AUTH_TYPE) || !sessConf.get(HIVE_AUTH_TYPE).equals(HIVE_AUTH_SIMPLE)){ try { @@ -162,21 +228,22 @@ private void openTransport(String uri, String host, int port, Map iface) throws SQLException { throw new SQLException("Method not supported"); } + /** + * Detect if the connection is still good + * @throws SQLException + */ + public void ping() throws SQLException{ + String validationQuery = "set ping = ping"; + boolean ping = pingInternal(validationQuery); + } + + private boolean pingInternal(String sql) throws SQLException { + if (isClosed) { + throw new SQLException("Can't execute after connection has been closed"); + } + + try { + Map sessConf = new HashMap(); + TExecuteStatementReq execReq = new TExecuteStatementReq(coreConnection.getSessionHandle(), sql); + execReq.setConfOverlay(sessConf); + TExecuteStatementResp execResp = coreConnection.getClient().ExecuteStatement(execReq); + Utils.verifySuccessWithInfo(execResp.getStatus()); + } catch (SQLException es) { + throw es; + } catch (Exception ex) { + throw new SQLException(ex.toString(), "08S01", ex); + } + + return true; + } + + + public boolean isAutoReconnect() { + return autoReconnect; + } + + public void setAutoReconnect(boolean autoReconnect) { + this.autoReconnect = autoReconnect; + } + + /** + * Determine if the connection need ping + * @return + */ + public boolean isNeedPing() { + return needPing; + } + + public void setNeedPing(boolean needPing) { + this.needPing = needPing; + } + + public TCLIService.Iface getClient(){ + return coreConnection.getClient(); + } + + public TSessionHandle getSessionHandle(){ + return coreConnection.getSessionHandle(); + } + + class CoreConnection{ + private TTransport transport; + private TCLIService.Iface client; + private TSessionHandle sessionHandle; + + private boolean isClosed; + + CoreConnection(){ + + } + + CoreConnection(TTransport transport, TCLIService.Iface client, TSessionHandle sessionHandle){ + this.transport = transport; + this.client = client; + this.sessionHandle = this.sessionHandle; + } + + TTransport getTransport() { + return transport; + } + + void setTransport(TTransport transport) { + this.transport = transport; + } + + TCLIService.Iface getClient() { + return client; + } + + void setClient(TCLIService.Iface client) { + this.client = client; + } + + TSessionHandle getSessionHandle() { + return sessionHandle; + } + + void setSessionHandle(TSessionHandle sessionHandle) { + this.sessionHandle = sessionHandle; + } + + boolean isClosed() { + return isClosed; + } + + public void close() throws SQLException { + if (!isClosed) { + + TCloseSessionReq closeReq = new TCloseSessionReq(sessionHandle); + try { + client.CloseSession(closeReq); + } catch (TException e) { + throw new SQLException("Error while cleaning up the server resources", e); + } finally { + isClosed = true; + if (transport != null) { + transport.close(); + } + isClosed = true; + } + } + } + } } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java index 8fb23cc..1637bd3 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java @@ -55,9 +55,8 @@ */ private final HashMap parameters=new HashMap(); - public HivePreparedStatement(TCLIService.Iface client, TSessionHandle sessHandle, - String sql) { - super(client, sessHandle); + public HivePreparedStatement(HiveConnection connection, String sql) { + super(connection); this.sql = sql; } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 982ceb8..8de6b58 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -34,6 +34,7 @@ import org.apache.hive.service.cli.thrift.TExecuteStatementResp; import org.apache.hive.service.cli.thrift.TOperationHandle; import org.apache.hive.service.cli.thrift.TSessionHandle; +import org.apache.thrift.transport.TTransportException; /** * HiveStatement. @@ -42,7 +43,8 @@ public class HiveStatement implements java.sql.Statement { private TCLIService.Iface client; private TOperationHandle stmtHandle; - private final TSessionHandle sessHandle; + private TSessionHandle sessHandle; + private HiveConnection connection; Map sessConf = new HashMap(); private int fetchSize = 50; /** @@ -74,9 +76,14 @@ /** * */ - public HiveStatement(TCLIService.Iface client, TSessionHandle sessHandle) { - this.client = client; - this.sessHandle = sessHandle; + public HiveStatement(HiveConnection connection) { + initConnection(connection); + } + + private void initConnection(HiveConnection connection){ + this.connection = connection; + this.client = connection.getClient(); + this.sessHandle = connection.getSessionHandle(); } /* @@ -168,6 +175,36 @@ public void closeOnCompletion() throws SQLException { throw new SQLException("Method not supported"); } + /** + * Close connection if autoReconnect is not true. + */ + private void closeConnection(){ + try{ + if(connection.isAutoReconnect()){ + connection.setNeedPing(true); + }else{ + connection.close(); + } + } catch (SQLException e){ + // need to print trace? + } + } + + /** + * Check the connection is alive and reconnect if necessary. + * @throws SQLException + */ + private void checkAvailability() throws SQLException{ + if(connection.isAutoReconnect() && connection.isNeedPing()){ + try{ + connection.ping(); + connection.setNeedPing(false); + }catch(SQLException e){ + initConnection(connection.reconnect()); + } + } + } + /* * (non-Javadoc) * @@ -179,15 +216,25 @@ public boolean execute(String sql) throws SQLException { throw new SQLException("Can't execute after statement has been closed"); } + checkAvailability(); + try { closeClientOperation(); TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql); execReq.setConfOverlay(sessConf); TExecuteStatementResp execResp = client.ExecuteStatement(execReq); + Utils.verifySuccessWithInfo(execResp.getStatus()); + stmtHandle = execResp.getOperationHandle(); } catch (SQLException eS) { + if(eS.getCause() instanceof TTransportException){ + closeConnection(); + } throw eS; + }catch (TTransportException ex){ + closeConnection(); + throw new SQLException(ex.toString(), "08S01", ex); } catch (Exception ex) { throw new SQLException(ex.toString(), "08S01", ex); } @@ -195,6 +242,7 @@ public boolean execute(String sql) throws SQLException { if (!stmtHandle.isHasResultSet()) { return false; } + resultSet = new HiveQueryResultSet.Builder().setClient(client).setSessionHandle(sessHandle) .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) .build(); @@ -302,7 +350,7 @@ public int executeUpdate(String sql, String[] columnNames) throws SQLException { */ public Connection getConnection() throws SQLException { - throw new SQLException("Method not supported"); + return connection; } /* diff --git a/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2Connection.java b/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2Connection.java new file mode 100644 index 0000000..86f0913 --- /dev/null +++ b/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2Connection.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import junit.framework.Assert; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.server.HiveServer2; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; + + +public class TestJdbcDriver2Connection { + private static HiveServer2 hiveserver2 = null; + + private static String driverName = "org.apache.hive.jdbc.HiveDriver"; + private static String tableName = "testHiveJdbcDriver_Table"; + private static HiveConf conf; + private static Path dataFilePath; + private static Path dataTypeDataFilePath; + private static Connection con; + + public TestJdbcDriver2Connection() { + + } + + @BeforeClass + public static void setUp() throws Exception { + + conf = new HiveConf(TestJdbcDriver2Connection.class); + String dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + dataFilePath = new Path(dataFileDir, "kv1.txt"); + dataTypeDataFilePath = new Path(dataFileDir, "datatypes.txt"); + + startServer(); + setupData(); + } + + @AfterClass + public static void tearDown() throws Exception { + clearData(); + stopServer(); + } + + private static void startServer() throws Exception{ + + HiveConf hiveConf = new HiveConf(); + hiveserver2 = new HiveServer2(); + hiveserver2.init(hiveConf); + hiveserver2.start(); + Thread.sleep(1000); + System.out.println("hiveServer2 start ......"); + + } + + private static void stopServer(){ + try{ + if(hiveserver2 !=null){ + hiveserver2.stop(); + hiveserver2 = null; + } + Thread.sleep(1000); + }catch (Exception e){ + e.printStackTrace(); + } + System.out.println("hiveServer2 stop ......"); + } + + private static void setupData() throws Exception{ + + String url = "jdbc:hive2://localhost:10000/default"; + con = createConnection(url); + + Assert.assertNotNull("Connection is null", con); + Assert.assertFalse("Connection should not be closed", con.isClosed()); + Statement stmt = con.createStatement(); + Assert.assertNotNull("Statement is null", stmt); + + stmt.execute("set hive.support.concurrency = false"); + + // drop table. ignore error. + try { + stmt.execute("drop table " + tableName); + } catch (Exception ex) { + Assert.fail(ex.toString()); + } + + // create table + stmt.execute("create table " + tableName + " (under_col int comment 'the under column', value string) "); + + // load data + stmt.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tableName); + + } + + private static void clearData() throws Exception{ + try{ + executeQuery(con, "drop table "+tableName); + }catch (Exception e){ + System.err.println(e.getMessage()); + }finally { + try{ + con.close(); + }catch (Exception e){ + e.printStackTrace(); + } + } + + } + + + @Test + public void testAutoReconnect() throws Exception{ + + String url1 = "jdbc:hive2://localhost:10000/default?autoReconnect=true"; + Connection autoConnection = createConnection(url1); + Assert.assertNotNull(autoConnection); + + stopServer(); + + Exception exception = null; + try{ + executeQuery(autoConnection, "show tables"); + } catch (Exception e){ + exception = e; + } + Assert.assertNotNull(exception); + + exception = null; + try{ + executeQuery(autoConnection, "show tables"); + } catch (Exception e){ + exception = e; + } + Assert.assertNotNull(exception); + Assert.assertTrue(exception.getMessage().contains("reconnect fail")); + + startServer(); + + //reconnect + exception = null; + try{ + executeQuery(autoConnection, "show tables"); + } catch (Exception e){ + exception = e; + } + Assert.assertNull("fail to reconnect", exception); + + //PreparedStatement + exception = null; + String preparedSql = "show tables"; + try { + PreparedStatement ps = autoConnection.prepareStatement(preparedSql); + ps.execute(); + ps.close(); + + } catch (Exception e) { + exception = e; + } + Assert.assertNull("fail to reconnect", exception); + + System.out.println(">>> PASSED testAutoReconnect"); + + } + + private static void executeQuery(Connection con, String sql) throws Exception{ + Statement stmt = con.createStatement(); + stmt.execute("set hive.support.concurrency = false"); + boolean result = stmt.execute(sql); + if(result){ + ResultSet rs = stmt.getResultSet(); + Assert.assertNotNull(rs); + rs.close(); + } + stmt.close(); + + } + + private static Connection createConnection(String url) throws Exception{ + + Class.forName(driverName); + Connection connection = DriverManager.getConnection(url,System.getProperty("user.name"), ""); + return connection; + + } + + +} -- 1.8.2.1