diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java index 8f67209..6c66c6b 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java @@ -347,6 +347,16 @@ public boolean next() throws SQLException { return false; } + /** + * Poll on the operation status, till the operation is complete. + * We need to wait only for HiveStatement to complete. + * HiveDatabaseMetaData which also uses this ResultSet returns only after the RPC is complete. + */ + if ((statement != null) && (statement instanceof HiveStatement)) { + ((HiveStatement) statement).waitForOperationToComplete(); + ((HiveStatement) statement).setIsLogBeingGenerated(false); + } + try { TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT; if (fetchFirst) { diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index b4dba44..696f69a 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -238,6 +238,46 @@ public void closeOnCompletion() throws SQLException { @Override public boolean execute(String sql) throws SQLException { + runAsyncOnServer(sql); + waitForOperationToComplete(); + isLogBeingGenerated = false; + + // The query should be completed by now + if (!stmtHandle.isHasResultSet()) { + return false; + } + resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) + .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) + .setScrollable(isScrollableResultset) + .build(); + return true; + } + + /** + * Starts the query execution asynchronously on the server, and immediately returns to the client. + * The client subsequently blocks on ResultSet#next or Statement#getUpdateCount, depending on the + * query type. + * Note: This method is a public API for usage outside of Hive, although it is not + * part of the interface java.sql.Statement. + * + * @param sql + * @return true if the first result is a ResultSet object; false if it is an update count or there + * are no results + * @throws SQLException + */ + public boolean executeAsync(String sql) throws SQLException { + runAsyncOnServer(sql); + if (!stmtHandle.isHasResultSet()) { + return false; + } + resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) + .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) + .setScrollable(isScrollableResultset) + .build(); + return true; + } + + private void runAsyncOnServer(String sql) throws SQLException { checkConnection("execute"); closeClientOperation(); @@ -265,7 +305,9 @@ public boolean execute(String sql) throws SQLException { isExecuteStatementFailed = true; throw new SQLException(ex.toString(), "08S01", ex); } + } + void waitForOperationToComplete() throws SQLException { TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle); boolean operationComplete = false; TGetOperationStatusResp statusResp; @@ -308,17 +350,6 @@ public boolean execute(String sql) throws SQLException { throw new SQLException(e.toString(), "08S01", e); } } - isLogBeingGenerated = false; - - // The query should be completed by now - if (!stmtHandle.isHasResultSet()) { - return false; - } - resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle) - .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize) - .setScrollable(isScrollableResultset) - .build(); - return true; } private void checkConnection(String action) throws SQLException { @@ -592,10 +623,16 @@ public int getResultSetType() throws SQLException { * * @see java.sql.Statement#getUpdateCount() */ - @Override public int getUpdateCount() throws SQLException { checkConnection("getUpdateCount"); + /** + * Poll on the operation status, till the operation is complete. We want to ensure that since a + * client might end up using executeAsync and then call this to check if the query run is + * finished. + */ + waitForOperationToComplete(); + isLogBeingGenerated = false; return -1; } @@ -856,4 +893,8 @@ private TFetchOrientation getFetchOrientation(boolean incremental) { return TFetchOrientation.FETCH_FIRST; } } + + void setIsLogBeingGenerated(boolean isLogBeingGenerated) { + this.isLogBeingGenerated = isLogBeingGenerated; + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java index 8868ec1..2eeee47 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -71,7 +74,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java index 35b6c52..574a757 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java @@ -20,6 +20,7 @@ import java.sql.DatabaseMetaData; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -221,7 +222,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -230,7 +231,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java index 8db2e62..d774f4f95 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java @@ -19,6 +19,8 @@ package org.apache.hive.service.cli.operation; import java.sql.DatabaseMetaData; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; @@ -128,7 +130,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -137,7 +139,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java index d6f6280..dc0a3dd 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hive.service.cli.FetchOrientation; @@ -77,7 +80,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -86,7 +89,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java index a09b39a..13d5b37 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; @@ -73,7 +76,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -82,7 +85,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java index 740b851..aac3692 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java @@ -60,9 +60,9 @@ .addStringColumn("TYPE_CAT", "The types catalog.") .addStringColumn("TYPE_SCHEM", "The types schema.") .addStringColumn("TYPE_NAME", "Type name.") - .addStringColumn("SELF_REFERENCING_COL_NAME", + .addStringColumn("SELF_REFERENCING_COL_NAME", "Name of the designated \"identifier\" column of a typed table.") - .addStringColumn("REF_GENERATION", + .addStringColumn("REF_GENERATION", "Specifies how values in SELF_REFERENCING_COL_NAME are created."); protected GetTablesOperation(HiveSession parentSession, @@ -102,7 +102,7 @@ public void runInternal() throws HiveSQLException { String tablePattern = convertIdentifierPattern(tableName, true); - for (TableMeta tableMeta : + for (TableMeta tableMeta : metastoreClient.getTableMeta(schemaPattern, tablePattern, tableTypeList)) { rowSet.addRow(new Object[] { DEFAULT_HIVE_CATALOG, @@ -125,7 +125,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -134,7 +134,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java index 2a0fec2..53660af 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.util.ArrayList; +import java.util.Arrays; + import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -123,7 +126,7 @@ public void runInternal() throws HiveSQLException { */ @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); return RESULT_SET_SCHEMA; } @@ -132,7 +135,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { */ @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); validateDefaultFetchOrientation(orientation); if (orientation.equals(FetchOrientation.FETCH_FIRST)) { rowSet.setStartOffset(0); diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index d9a273b..1cb6ef1 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; @@ -199,9 +200,14 @@ protected void setOperationException(HiveSQLException operationException) { this.operationException = operationException; } - protected final void assertState(OperationState state) throws HiveSQLException { - if (this.state != state) { - throw new HiveSQLException("Expected state " + state + ", but found " + this.state); + protected final void assertState(List states) throws HiveSQLException { + boolean validStateTransition = false; + if (states.contains(state)) { + validStateTransition = true; + } + if (!validStateTransition) { + throw new HiveSQLException("Expected states: " + states.toString() + ", but found " + + this.state); } this.lastAccessTime = System.currentTimeMillis(); } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 04d816a..0d417d9 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -343,7 +343,10 @@ public void close() throws HiveSQLException { @Override public TableSchema getResultSetSchema() throws HiveSQLException { - assertState(OperationState.FINISHED); + // Since compilation is always a blocking RPC call, and schema is ready after compilation, + // we can return when are in the RUNNING state. + assertState(new ArrayList(Arrays.asList(OperationState.RUNNING, + OperationState.FINISHED))); if (resultSchema == null) { resultSchema = new TableSchema(driver.getSchema()); } @@ -355,7 +358,7 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { validateDefaultFetchOrientation(orientation); - assertState(OperationState.FINISHED); + assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion());