diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 7243648..b0fa98f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -2734,7 +2734,33 @@ public void setAutoCommitOnClosedConnection() throws Exception { @Test public void testSelectExecAsync() throws Exception { HiveStatement stmt = (HiveStatement) con.createStatement(); - ResultSet rs; + testSelect(stmt); + stmt.close(); + } + + @Test + public void testSelectExecAsync2() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + + stmt.execute("SET hive.driver.parallel.compilation=true"); + stmt.execute("SET hive.server2.async.exec.async.compile=true"); + + testSelect(stmt); + stmt.close(); + } + + @Test + public void testSelectExecAsync3() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + + stmt.execute("SET hive.driver.parallel.compilation=true"); + stmt.execute("SET hive.server2.async.exec.async.compile=false"); + + testSelect(stmt); + stmt.close(); + } + + private void testSelect(HiveStatement stmt) throws SQLException { // Expected row count of the join query we'll run int expectedCount = 1028; int rowCount = 0; @@ -2742,7 +2768,7 @@ public void testSelectExecAsync() throws Exception { stmt.executeAsync("select t1.value as v11, " + "t2.value as v12 from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col"); assertTrue(isResulSet); - rs = stmt.getResultSet(); + ResultSet rs = stmt.getResultSet(); assertNotNull(rs); // ResultSet#next blocks until the async query is complete while (rs.next()) { @@ -2751,7 +2777,6 @@ public void testSelectExecAsync() throws Exception { assertNotNull(value); } assertEquals(rowCount, expectedCount); - stmt.close(); } /** @@ -2789,6 +2814,33 @@ public void testCreateTableExecAsync() throws Exception { @Test public void testInsertOverwriteExecAsync() throws Exception { HiveStatement stmt = (HiveStatement) con.createStatement(); + testInsertOverwrite(stmt); + stmt.close(); + } + + @Test + public void testInsertOverwriteExecAsync2() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + + stmt.execute("SET hive.driver.parallel.compilation=true"); + stmt.execute("SET hive.server2.async.exec.async.compile=true"); + + testInsertOverwrite(stmt); + stmt.close(); + } + + @Test + public void testInsertOverwriteExecAsync3() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + + stmt.execute("SET hive.driver.parallel.compilation=true"); + stmt.execute("SET hive.server2.async.exec.async.compile=false"); + + testInsertOverwrite(stmt); + stmt.close(); + } + + private void testInsertOverwrite(HiveStatement stmt) throws SQLException { String tblName = "testInsertOverwriteExecAsync"; int rowCount = 0; stmt.execute("create table " + tblName + " (col1 int , col2 string)"); @@ -2807,6 +2859,5 @@ public void testInsertOverwriteExecAsync() throws Exception { } assertEquals(rowCount, dataFileRowCount); stmt.execute("drop table " + tblName); - stmt.close(); } } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index c4784c3..a242501 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -278,7 +278,8 @@ public boolean execute(String sql) throws SQLException { */ public boolean executeAsync(String sql) throws SQLException { runAsyncOnServer(sql); - if (!stmtHandle.isHasResultSet()) { + TGetOperationStatusResp status = waitForResultSetStatus(); + if (!status.isHasResultSet()) { return false; } resultSet = @@ -318,6 +319,27 @@ private void runAsyncOnServer(String sql) throws SQLException { } } + /** + * Poll the result set status by checking if isSetHasResultSet is set + * @return + * @throws SQLException + */ + private TGetOperationStatusResp waitForResultSetStatus() throws SQLException { + TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); + TGetOperationStatusResp statusResp = null; + + while(statusResp == null || !statusResp.isSetHasResultSet()) { + try { + statusResp = client.GetOperationStatus(statusReq); + } catch (TException e) { + isLogBeingGenerated = false; + throw new SQLException(e.toString(), "08S01", e); + } + } + + return statusResp; + } + TGetOperationStatusResp waitForOperationToComplete() throws SQLException { TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); TGetOperationStatusResp statusResp = null;