diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 4eaff10..b4137b3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -19,7 +19,6 @@ package org.apache.hive.jdbc; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; @@ -2597,6 +2596,80 @@ public void setAutoCommitOnClosedConnection() throws Exception { } finally { mycon.close(); } + } + + /** + * Test {@link HiveStatement#executeAsync(String)} for a select query + * @throws Exception + */ + @Test + public void testSelectExecAsync() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + ResultSet rs; + System.out.println("Executing query: "); + boolean isResulSet = + stmt.executeAsync("select t1.value as v11, " + "t2.value as v12 from " + tableName + + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col"); + assertTrue(isResulSet); + rs = stmt.getResultSet(); + assertNotNull(rs); + while (rs.next()) { + String value = rs.getString(2); + assertNotNull(value); + } + stmt.close(); + } + /** + * Test {@link HiveStatement#executeAsync(String)} for a create table + * @throws Exception + */ + @Test + public void testCreateTableExecAsync() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + String tblName = "testCreateTableExecAsync"; + System.out.println("Executing query: "); + boolean isResulSet = stmt.executeAsync("create table " + tblName + " (col1 int , col2 string)"); + assertFalse(isResulSet); + stmt.getUpdateCount(); + DatabaseMetaData metadata = con.getMetaData(); + ResultSet tablesMetadata = metadata.getTables(null, null, "%", null); + boolean tblFound = false; + while (tablesMetadata.next()) { + String tableName = tablesMetadata.getString(3); + if (tableName.equalsIgnoreCase(tblName)) { + tblFound = true; + } + } + if (!tblFound) { + fail("Unable to create table using executeAsync"); + } + stmt.execute("drop table " + tblName); + stmt.close(); + } + + /** + * Test {@link HiveStatement#executeAsync(String)} for an insert overwrite into a table + * @throws Exception + */ + @Test + public void testInsertOverwriteExecAsync() throws Exception { + HiveStatement stmt = (HiveStatement) con.createStatement(); + String tblName = "testInsertOverwriteExecAsync"; + System.out.println("Executing query: "); + stmt.execute("create table " + tblName + " (col1 int , col2 string)"); + boolean isResulSet = + stmt.executeAsync("insert overwrite table " + tblName + " select * from " + tableName); + assertFalse(isResulSet); + stmt.getUpdateCount(); + // Read from the new table + ResultSet rs = stmt.executeQuery("select * from " + tblName); + assertNotNull(rs); + while (rs.next()) { + String value = rs.getString(2); + assertNotNull(value); + } + stmt.execute("drop table " + tblName); + stmt.close(); } } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java index 8f67209..92fdbca 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java @@ -347,6 +347,15 @@ public boolean next() throws SQLException { return false; } + /** + * Poll on the operation status, till the operation is complete. + * We need to wait only for HiveStatement to complete. + * HiveDatabaseMetaData which also uses this ResultSet returns only after the RPC is complete. + */ + if ((statement != null) && (statement instanceof HiveStatement)) { + ((HiveStatement) statement).waitForOperationToComplete(); + } + try { TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT; if (fetchFirst) { diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index b4dba44..1743553 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -61,6 +61,7 @@ Map sessConf = new HashMap(); private int fetchSize = DEFAULT_FETCH_SIZE; private boolean isScrollableResultset = false; + private boolean isOperationComplete = false; /** * We need to keep a reference to the result set to support the following: * @@ -238,6 +239,47 @@ public void closeOnCompletion() throws SQLException { @Override public boolean execute(String sql) throws SQLException { + runAsyncOnServer(sql); + waitForOperationToComplete(); + + // The query should be completed by now + if (!stmtHandle.isHasResultSet()) { + return false; + } + resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) + .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) + .setScrollable(isScrollableResultset) + .build(); + return true; + } + + /** + * Starts the query execution asynchronously on the server, and immediately returns to the client. + * The client subsequently blocks on ResultSet#next or Statement#getUpdateCount, depending on the + * query type. Uers should call ResultSet.next or Statement#getUpdateCount (depending on whether + * query returns results) to ensure that query completes successfully. Calling another execute* + * method, or close before query completion would result in the async query getting killed if it + * is not already finished. Note: This method is a public API for usage outside of Hive, although + * it is not part of the interface java.sql.Statement. + * + * @param sql + * @return true if the first result is a ResultSet object; false if it is an update count or there + * are no results + * @throws SQLException + */ + public boolean executeAsync(String sql) throws SQLException { + runAsyncOnServer(sql); + if (!stmtHandle.isHasResultSet()) { + return false; + } + resultSet = + new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) + .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) + .setScrollable(isScrollableResultset).build(); + return true; + } + + private void runAsyncOnServer(String sql) throws SQLException { checkConnection("execute"); closeClientOperation(); @@ -265,13 +307,14 @@ public boolean execute(String sql) throws SQLException { isExecuteStatementFailed = true; throw new SQLException(ex.toString(), "08S01", ex); } + } + void waitForOperationToComplete() throws SQLException { TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); - boolean operationComplete = false; TGetOperationStatusResp statusResp; // Poll on the operation status, till the operation is complete - while (!operationComplete) { + while (!isOperationComplete) { try { /** * For an async SQLOperation, GetOperationStatus will use the long polling approach @@ -283,7 +326,8 @@ public boolean execute(String sql) throws SQLException { switch (statusResp.getOperationState()) { case CLOSED_STATE: case FINISHED_STATE: - operationComplete = true; + isOperationComplete = true; + isLogBeingGenerated = false; break; case CANCELED_STATE: // 01000 -> warning @@ -308,17 +352,6 @@ public boolean execute(String sql) throws SQLException { throw new SQLException(e.toString(), "08S01", e); } } - isLogBeingGenerated = false; - - // The query should be completed by now - if (!stmtHandle.isHasResultSet()) { - return false; - } - resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) - .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) - .setScrollable(isScrollableResultset) - .build(); - return true; } private void checkConnection(String action) throws SQLException { @@ -332,6 +365,7 @@ private void initFlags() { isQueryClosed = false; isLogBeingGenerated = true; isExecuteStatementFailed = false; + isOperationComplete = false; } /* @@ -592,10 +626,15 @@ public int getResultSetType() throws SQLException { * * @see java.sql.Statement#getUpdateCount() */ - @Override public int getUpdateCount() throws SQLException { checkConnection("getUpdateCount"); + /** + * Poll on the operation status, till the operation is complete. We want to ensure that since a + * client might end up using executeAsync and then call this to check if the query run is + * finished. + */ + waitForOperationToComplete(); return -1; } diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java index 8868ec1..2eeee47 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -71,7 +74,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java index 35b6c52..574a757 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java @@ -20,6 +20,7 @@ import java.sql.DatabaseMetaData; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -221,7 +222,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -230,7 +231,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java index 8db2e62..d774f4f95 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java @@ -19,6 +19,8 @@ package org.apache.hive.service.cli.operation; import java.sql.DatabaseMetaData; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; @@ -128,7 +130,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -137,7 +139,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java index d6f6280..dc0a3dd 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hive.service.cli.FetchOrientation; @@ -77,7 +80,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -86,7 +89,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java index a09b39a..13d5b37 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; @@ -73,7 +76,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -82,7 +85,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java index 740b851..aac3692 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java @@ -60,9 +60,9 @@ .addStringColumn("TYPE_CAT", "The types catalog.") .addStringColumn("TYPE_SCHEM", "The types schema.") .addStringColumn("TYPE_NAME", "Type name.") - .addStringColumn("SELF_REFERENCING_COL_NAME", + .addStringColumn("SELF_REFERENCING_COL_NAME", "Name of the designated \"identifier\" column of a typed table.") - .addStringColumn("REF_GENERATION", + .addStringColumn("REF_GENERATION", "Specifies how values in SELF_REFERENCING_COL_NAME are created."); protected GetTablesOperation(HiveSession parentSession, @@ -102,7 +102,7 @@ public void runInternal() throws HiveSQLException { String tablePattern = convertIdentifierPattern(tableName, true); - for (TableMeta tableMeta : + for (TableMeta tableMeta : metastoreClient.getTableMeta(schemaPattern, tablePattern, tableTypeList)) { rowSet.addRow(new Object[] { DEFAULT_HIVE_CATALOG, @@ -125,7 +125,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -134,7 +134,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java index 2a0fec2..53660af 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -123,7 +126,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -132,7 +135,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index b7d6549..4f3e9c2 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; @@ -199,9 +200,10 @@ protected void setOperationException(HiveSQLException operationException) { this.operationException = operationException; } - protected final void assertState(OperationState state) throws HiveSQLException { - if (this.state != state) { - throw new HiveSQLException("Expected state " + state + ", but found " + this.state); + protected final void assertState(List states) throws HiveSQLException { + if (!states.contains(state)) { + throw new HiveSQLException("Expected states: " + states.toString() + ", but found " + + this.state); } this.lastAccessTime = System.currentTimeMillis(); } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 9ce6055..ce06c1c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -343,7 +343,10 @@ public void close() throws HiveSQLException { @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + // Since compilation is always a blocking RPC call, and schema is ready after compilation, + // we can return when are in the RUNNING state. + assertState(new ArrayList(Arrays.asList(OperationState.RUNNING, + OperationState.FINISHED))); if (resultSchema == null) { resultSchema = new TableSchema(driver.getSchema()); } @@ -355,7 +358,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { validateDefaultFetchOrientation(orientation); - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion());