From 1366ff828dbde5c2c261ffb92978cb85bc4f7e56 Mon Sep 17 00:00:00 2001 From: Azrael Park Date: Sat, 10 Aug 2013 03:16:19 +0900 Subject: [PATCH] HIVE-4901: Connection should be closed when Statement#execute() failed byTTransportException --- .../java/org/apache/hive/jdbc/HiveConnection.java | 17 +- .../apache/hive/jdbc/HivePreparedStatement.java | 19 +- .../java/org/apache/hive/jdbc/HiveStatement.java | 20 +- .../hive/jdbc/TestJdbcDriver2Connection.java | 201 +++++++++++++++++++++ 4 files changed, 251 insertions(+), 6 deletions(-) create mode 100644 jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2Connection.java diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 9fbc8ad..167b31d 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -314,7 +314,7 @@ public Statement createStatement() throws SQLException { if (isClosed) { throw new SQLException("Can't create Statement, connection is closed"); } - return new HiveStatement(client, sessHandle); + return new HiveStatement(client, sessHandle, this); } /* @@ -542,7 +542,10 @@ public CallableStatement prepareCall(String sql, int resultSetType, */ public PreparedStatement prepareStatement(String sql) throws SQLException { - return new HivePreparedStatement(client, sessHandle, sql); + if (isClosed) { + throw new SQLException("Can't create Statement, connection is closed"); + } + return new HivePreparedStatement(client, sessHandle, sql, this); } /* @@ -553,7 +556,10 @@ public PreparedStatement prepareStatement(String sql) throws SQLException { public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { - return new HivePreparedStatement(client, sessHandle, sql); + if (isClosed) { + throw new SQLException("Can't create Statement, connection is closed"); + } + return new HivePreparedStatement(client, sessHandle, sql, this); } /* @@ -589,7 +595,10 @@ public PreparedStatement prepareStatement(String sql, String[] columnNames) public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { - return new HivePreparedStatement(client, sessHandle, sql); + if (isClosed) { + throw new SQLException("Can't create Statement, connection is closed"); + } + return new HivePreparedStatement(client, sessHandle, sql, this); } /* diff --git a/jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java index dfcd536..ff970d3 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HivePreparedStatement.java @@ -48,6 +48,7 @@ import org.apache.hive.service.cli.thrift.TExecuteStatementResp; import org.apache.hive.service.cli.thrift.TOperationHandle; import org.apache.hive.service.cli.thrift.TSessionHandle; +import org.apache.thrift.transport.TTransportException; /** * HivePreparedStatement. @@ -58,6 +59,7 @@ private TCLIService.Iface client; private final TSessionHandle sessHandle; private TOperationHandle stmtHandle; + private HiveConnection connection; Map sessConf = new HashMap(); /** @@ -97,12 +99,20 @@ * */ public HivePreparedStatement(TCLIService.Iface client, TSessionHandle sessHandle, - String sql) { + String sql, HiveConnection connection) { this.client = client; this.sessHandle = sessHandle; this.sql = sql; + this.connection = connection; } + private void closeConnection(){ + try { + connection.close(); + } catch (SQLException eS){ + // need to print trace? + } + } /* * (non-Javadoc) * @@ -187,8 +197,15 @@ protected ResultSet executeImmediate(String sql) throws SQLException { Utils.verifySuccessWithInfo(execResp.getStatus()); stmtHandle = execResp.getOperationHandle(); } catch (SQLException es) { + if(es.getCause() instanceof TTransportException){ + closeConnection(); + } throw es; + }catch (TTransportException ex){ + closeConnection(); + throw new SQLException(ex.toString(), "08S01", ex); } catch (Exception ex) { + throw new SQLException(ex.toString(), "08S01", ex); } resultSet = new HiveQueryResultSet.Builder().setClient(client).setSessionHandle(sessHandle) diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 982ceb8..46de1ba 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -34,6 +34,7 @@ import org.apache.hive.service.cli.thrift.TExecuteStatementResp; import org.apache.hive.service.cli.thrift.TOperationHandle; import org.apache.hive.service.cli.thrift.TSessionHandle; +import org.apache.thrift.transport.TTransportException; /** * HiveStatement. @@ -43,6 +44,7 @@ private TCLIService.Iface client; private TOperationHandle stmtHandle; private final TSessionHandle sessHandle; + private HiveConnection connection; Map sessConf = new HashMap(); private int fetchSize = 50; /** @@ -74,9 +76,10 @@ /** * */ - public HiveStatement(TCLIService.Iface client, TSessionHandle sessHandle) { + public HiveStatement(TCLIService.Iface client, TSessionHandle sessHandle, HiveConnection connection) { this.client = client; this.sessHandle = sessHandle; + this.connection = connection; } /* @@ -168,6 +171,14 @@ public void closeOnCompletion() throws SQLException { throw new SQLException("Method not supported"); } + private void closeConnection(){ + try { + connection.close(); + } catch (SQLException eS){ + // need to print trace? + } + } + /* * (non-Javadoc) * @@ -187,7 +198,13 @@ public boolean execute(String sql) throws SQLException { Utils.verifySuccessWithInfo(execResp.getStatus()); stmtHandle = execResp.getOperationHandle(); } catch (SQLException eS) { + if(eS.getCause() instanceof TTransportException){ + closeConnection(); + } throw eS; + }catch (TTransportException ex){ + closeConnection(); + throw new SQLException(ex.toString(), "08S01", ex); } catch (Exception ex) { throw new SQLException(ex.toString(), "08S01", ex); } @@ -195,6 +212,7 @@ public boolean execute(String sql) throws SQLException { if (!stmtHandle.isHasResultSet()) { return false; } + resultSet = new HiveQueryResultSet.Builder().setClient(client).setSessionHandle(sessHandle) .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) .build(); diff --git a/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2Connection.java b/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2Connection.java new file mode 100644 index 0000000..b13c94a --- /dev/null +++ b/jdbc/src/test/org/apache/hive/jdbc/TestJdbcDriver2Connection.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import junit.framework.Assert; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.server.HiveServer2; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; + + +public class TestJdbcDriver2Connection { + private static HiveServer2 hiveserver2 = null; + + private static String driverName = "org.apache.hive.jdbc.HiveDriver"; + private static String tableName = "testHiveJdbcDriver_Table"; + private static HiveConf conf; + private static Path dataFilePath; + private static Path dataTypeDataFilePath; + private static Connection con; + + public TestJdbcDriver2Connection() { + + } + + @BeforeClass + public static void setUp() throws Exception { + + conf = new HiveConf(TestJdbcDriver2Connection.class); + String dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + dataFilePath = new Path(dataFileDir, "kv1.txt"); + dataTypeDataFilePath = new Path(dataFileDir, "datatypes.txt"); + + startServer(); + setupData(); + } + + @AfterClass + public static void tearDown() throws Exception { + clearData(); + stopServer(); + } + + private static void startServer() throws Exception{ + + HiveConf hiveConf = new HiveConf(); + hiveserver2 = new HiveServer2(); + hiveserver2.init(hiveConf); + hiveserver2.start(); + Thread.sleep(1000); + System.out.println("hiveServer2 start ......"); + + } + + private static void stopServer(){ + try{ + if(hiveserver2 !=null){ + hiveserver2.stop(); + hiveserver2 = null; + } + Thread.sleep(1000); + }catch (Exception e){ + e.printStackTrace(); + } + System.out.println("hiveServer2 stop ......"); + } + + private static void setupData() throws Exception{ + + String url = "jdbc:hive2://localhost:10000/default"; + con = createConnection(url); + + Assert.assertNotNull("Connection is null", con); + Assert.assertFalse("Connection should not be closed", con.isClosed()); + Statement stmt = con.createStatement(); + Assert.assertNotNull("Statement is null", stmt); + + stmt.execute("set hive.support.concurrency = false"); + + // drop table. ignore error. + try { + stmt.execute("drop table " + tableName); + } catch (Exception ex) { + Assert.fail(ex.toString()); + } + + // create table + stmt.execute("create table " + tableName + " (under_col int comment 'the under column', value string) "); + + // load data + stmt.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tableName); + + } + + private static void clearData() throws Exception{ + try{ + executeQuery(con, "drop table "+tableName); + }catch (Exception e){ + System.err.println(e.getMessage()); + }finally { + con.close(); + } + + } + + @Test + public void testConnectionClose() throws Exception{ + + String url = "jdbc:hive2://localhost:10000/default"; + Connection connection = createConnection(url); + Assert.assertNotNull(connection); + + stopServer(); + + // connection will be closed + Exception exception = null; + try{ + executeQuery(connection, "show tables"); + } catch (Exception e){ + exception = e; + } + Assert.assertNotNull(exception); + + startServer(); + + //reconnect + exception = null; + try{ + executeQuery(connection, "show tables"); + } catch (Exception e){ + exception = e; + } + Assert.assertNotNull("fail to reconnect", exception); + System.out.println("exception : "+exception.getMessage()); + Assert.assertTrue(exception.getMessage().contains("connection is closed")); + + //PreparedStatement + String preparedSql = "show tables"; + try { + PreparedStatement ps = connection.prepareStatement(preparedSql); + ps.executeUpdate(); + ps.close(); + + } catch (Exception e) { + exception = e; + } + System.out.println("exception : "+exception.getMessage()); + Assert.assertTrue(exception.getMessage().contains("connection is closed")); + + System.out.println(">>> PASSED testConnectionClose"); + + } + + + private static void executeQuery(Connection con, String sql) throws Exception{ + Statement stmt = con.createStatement(); + stmt.execute("set hive.support.concurrency = false"); + boolean result = stmt.execute(sql); + if(result){ + ResultSet rs = stmt.getResultSet(); + Assert.assertNotNull(rs); + rs.close(); + } + stmt.close(); + + } + + private static Connection createConnection(String url) throws Exception{ + + Class.forName(driverName); + Connection connection = DriverManager.getConnection(url,System.getProperty("user.name"), ""); + return connection; + + } + + +} -- 1.8.2.1